|
|
@@ -123,7 +123,6 @@ class Mq
|
|
|
if ($callback !== null) {
|
|
|
try {
|
|
|
$result = $callback(json_decode($message->getBody()));
|
|
|
- $message->ack();
|
|
|
Log::debug(
|
|
|
'mq done',
|
|
|
[
|
|
|
@@ -134,7 +133,6 @@ class Mq
|
|
|
throw new \Exception('task error');
|
|
|
}
|
|
|
} catch (\Exception $e) {
|
|
|
- $message->nack();
|
|
|
Log::error('mq worker exception', [
|
|
|
'message_id' => $message->get('message_id'),
|
|
|
'exception' => $e
|
|
|
@@ -198,7 +196,7 @@ class Mq
|
|
|
nowait:
|
|
|
callback: A PHP Callback
|
|
|
*/
|
|
|
- $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
|
|
|
+ $channel->basic_consume($queue, $consumerTag, false, true, false, false, $process_message);
|
|
|
|
|
|
/**
|
|
|
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
|
|
|
@@ -217,8 +215,8 @@ class Mq
|
|
|
try {
|
|
|
$channel->wait(null, false, $timeout);
|
|
|
} catch (AMQPTimeoutException $e) {
|
|
|
- } catch (AMQPProtocolChannelException $e) {
|
|
|
- }
|
|
|
+ // ignore it
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|