|
|
@@ -19,12 +19,15 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
protected $currentRetryCount = 0;
|
|
|
protected $tries = 0;
|
|
|
protected $timeout = 0;
|
|
|
+ protected $messageId = null;
|
|
|
+ protected $stop = false;
|
|
|
|
|
|
- public function __construct(string $queueName, array $messageData, int $retryCount = 0)
|
|
|
+ public function __construct(string $queueName, string $messageId, array $messageData, int $retryCount = 0)
|
|
|
{
|
|
|
$this->queueName = $queueName;
|
|
|
$this->messageData = $messageData;
|
|
|
$this->currentRetryCount = $retryCount;
|
|
|
+ $this->messageId = $messageId;
|
|
|
|
|
|
// 从配置读取重试次数和超时时间
|
|
|
$queueConfig = config("mint.rabbitmq.queues.{$queueName}");
|
|
|
@@ -32,12 +35,12 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
$this->timeout = $queueConfig['timeout'] ?? 300;
|
|
|
}
|
|
|
|
|
|
- public function handle($messageId = null)
|
|
|
+ public function handle()
|
|
|
{
|
|
|
try {
|
|
|
Log::info("开始处理队列消息", [
|
|
|
'queue' => $this->queueName,
|
|
|
- 'message_id' => $this->messageData['id'] ?? 'unknown',
|
|
|
+ 'message_id' => $this->messageId ?? 'unknown',
|
|
|
'retry_count' => $this->currentRetryCount
|
|
|
]);
|
|
|
|
|
|
@@ -46,7 +49,7 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
|
|
|
Log::info("队列消息处理完成", [
|
|
|
'queue' => $this->queueName,
|
|
|
- 'message_id' => $this->messageData['id'] ?? 'unknown',
|
|
|
+ 'message_id' => $this->messageId ?? 'unknown',
|
|
|
'result' => $result
|
|
|
]);
|
|
|
|
|
|
@@ -54,7 +57,7 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
} catch (\Exception $e) {
|
|
|
Log::error("队列消息处理失败", [
|
|
|
'queue' => $this->queueName,
|
|
|
- 'message_id' => $this->messageData['id'] ?? 'unknown',
|
|
|
+ 'message_id' => $this->messageId ?? 'unknown',
|
|
|
'error' => $e->getMessage(),
|
|
|
'retry_count' => $this->currentRetryCount,
|
|
|
'max_retries' => $this->tries
|
|
|
@@ -73,7 +76,7 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
{
|
|
|
Log::error("队列消息最终失败", [
|
|
|
'queue' => $this->queueName,
|
|
|
- 'message_id' => $this->messageData['id'] ?? 'unknown',
|
|
|
+ 'message_id' => $this->messageId ?? 'unknown',
|
|
|
'error' => $exception->getMessage(),
|
|
|
'retry_count' => $this->currentRetryCount
|
|
|
]);
|
|
|
@@ -90,7 +93,7 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
// 默认实现:记录日志
|
|
|
Log::error("消息处理最终失败,准备发送到死信队列", [
|
|
|
'queue' => $this->queueName,
|
|
|
- 'message_data' => $messageData,
|
|
|
+ 'message_id' => $this->messageId ?? 'unknown',
|
|
|
'error' => $exception->getMessage()
|
|
|
]);
|
|
|
}
|
|
|
@@ -104,4 +107,12 @@ abstract class BaseRabbitMQJob implements ShouldQueue
|
|
|
{
|
|
|
return $this->currentRetryCount;
|
|
|
}
|
|
|
+ public function stop()
|
|
|
+ {
|
|
|
+ $this->stop = true;
|
|
|
+ }
|
|
|
+ public function isStop()
|
|
|
+ {
|
|
|
+ return $this->stop;
|
|
|
+ }
|
|
|
}
|