|
|
@@ -121,14 +121,19 @@ class Mq
|
|
|
if ($callback !== null) {
|
|
|
try {
|
|
|
$result = $callback(json_decode($message->body));
|
|
|
- if (\App\Tools\Tools::isStop()) {
|
|
|
- Log::debug('mq worker: .stop file exist. cancel the consumer.');
|
|
|
- $message->getChannel()->basic_cancel($message->getConsumerTag());
|
|
|
- }
|
|
|
+ $message->ack();
|
|
|
+ Log::debug(
|
|
|
+ 'mq done',
|
|
|
+ [
|
|
|
+ 'message_id' => $message->get('message_id')
|
|
|
+ ]
|
|
|
+ );
|
|
|
if ($result !== 0) {
|
|
|
throw new \Exception('task error');
|
|
|
}
|
|
|
} catch (\Exception $e) {
|
|
|
+ $message->nack();
|
|
|
+
|
|
|
// push to issues
|
|
|
Log::error('mq worker exception', ['exception' => $e]);
|
|
|
$channelName = 'issues';
|
|
|
@@ -145,15 +150,13 @@ class Mq
|
|
|
$channelIssues->basic_publish($msg, '', $channelName);
|
|
|
$channelIssues->close();
|
|
|
}
|
|
|
+
|
|
|
+ if (\App\Tools\Tools::isStop()) {
|
|
|
+ Log::debug('mq worker: .stop file exist. cancel the consumer.');
|
|
|
+ $message->getChannel()->basic_cancel($message->getConsumerTag());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- $message->ack();
|
|
|
- Log::debug(
|
|
|
- 'mq done',
|
|
|
- [
|
|
|
- 'message_id' => $message->get('message_id')
|
|
|
- ]
|
|
|
- );
|
|
|
|
|
|
//exit
|
|
|
foreach (config('mint.mq.loop_limit') as $key => $value) {
|
|
|
@@ -202,15 +205,12 @@ class Mq
|
|
|
register_shutdown_function($shutdown, $channel, $connection);
|
|
|
|
|
|
$timeout = 15;
|
|
|
- $deadline = time() + $timeout;
|
|
|
// Loop as long as the channel has callbacks registered
|
|
|
while ($channel->is_consuming()) {
|
|
|
try {
|
|
|
- $channel->wait(null, false, $deadline - time());
|
|
|
- } catch (\Throwable $th) {
|
|
|
- //throw $th;
|
|
|
+ $channel->wait(null, false, $timeout);
|
|
|
+ } catch (\AMQPTimeoutException $e) {
|
|
|
}
|
|
|
- // do something else
|
|
|
}
|
|
|
}
|
|
|
}
|