|
@@ -115,7 +115,7 @@ class Mq
|
|
|
/**
|
|
/**
|
|
|
* @param \PhpAmqpLib\Message\AMQPMessage $message
|
|
* @param \PhpAmqpLib\Message\AMQPMessage $message
|
|
|
*/
|
|
*/
|
|
|
- $process_message = function ($message) use ($callback, $connection, $exchange, $queue) {
|
|
|
|
|
|
|
+ $process_message = function ($message) use ($callback, $queue) {
|
|
|
Log::debug('received message', [
|
|
Log::debug('received message', [
|
|
|
'message_id' => $message->get('message_id'),
|
|
'message_id' => $message->get('message_id'),
|
|
|
'content_type' => $message->get('content_type')
|
|
'content_type' => $message->get('content_type')
|
|
@@ -133,7 +133,8 @@ class Mq
|
|
|
throw new \Exception('task error');
|
|
throw new \Exception('task error');
|
|
|
}
|
|
}
|
|
|
} catch (\Exception $e) {
|
|
} catch (\Exception $e) {
|
|
|
- Log::error('mq worker exception', [
|
|
|
|
|
|
|
+ Log::error("mq worker {$queue} exception", [
|
|
|
|
|
+ 'queue' => $queue,
|
|
|
'message_id' => $message->get('message_id'),
|
|
'message_id' => $message->get('message_id'),
|
|
|
'exception' => $e
|
|
'exception' => $e
|
|
|
]);
|
|
]);
|
|
@@ -215,8 +216,8 @@ class Mq
|
|
|
try {
|
|
try {
|
|
|
$channel->wait(null, false, $timeout);
|
|
$channel->wait(null, false, $timeout);
|
|
|
} catch (AMQPTimeoutException $e) {
|
|
} catch (AMQPTimeoutException $e) {
|
|
|
- // ignore it
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // ignore it
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|