|
|
@@ -8,6 +8,7 @@ use PhpAmqpLib\Message\AMQPMessage;
|
|
|
use PhpAmqpLib\Wire\AMQPTable;
|
|
|
use Illuminate\Support\Facades\Log;
|
|
|
use Illuminate\Support\Str;
|
|
|
+use App\Exceptions\TaskFailException;
|
|
|
|
|
|
class RabbitMQService
|
|
|
{
|
|
|
@@ -188,36 +189,34 @@ class RabbitMQService
|
|
|
]);
|
|
|
|
|
|
// 执行回调处理消息
|
|
|
- $result = $callback($data, $retryCount);
|
|
|
-
|
|
|
- if ($result === true) {
|
|
|
- // 处理成功,确认消息
|
|
|
- $msg->ack();
|
|
|
- Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
|
|
|
- } else {
|
|
|
- // 处理失败,检查重试次数
|
|
|
- if ($retryCount < $maxRetries) {
|
|
|
- // 重新入队,延迟处理
|
|
|
- $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
|
|
|
- Log::warning("Message requeued for retry", [
|
|
|
- 'delivery_tag' => $msg->getDeliveryTag(),
|
|
|
- 'retry_count' => $retryCount + 1
|
|
|
- ]);
|
|
|
- } else {
|
|
|
- // 超过重试次数,拒绝消息(进入死信队列)
|
|
|
- $msg->nack(false, false);
|
|
|
- Log::error("Message rejected after max retries", [
|
|
|
- 'delivery_tag' => $msg->getDeliveryTag(),
|
|
|
- 'retry_count' => $retryCount
|
|
|
- ]);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (\Exception $e) {
|
|
|
+ $callback($data, $retryCount);
|
|
|
+ // 处理成功,确认消息
|
|
|
+ $msg->ack();
|
|
|
+ Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
|
|
|
+ } catch (TaskFailException $e) {
|
|
|
+ //no need requeue
|
|
|
Log::error("Error processing message", [
|
|
|
'error' => $e->getMessage(),
|
|
|
'delivery_tag' => $msg->getDeliveryTag()
|
|
|
]);
|
|
|
$msg->nack(false, false);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ // 处理失败,检查重试次数
|
|
|
+ if ($retryCount < $maxRetries) {
|
|
|
+ // 重新入队,延迟处理
|
|
|
+ $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
|
|
|
+ Log::warning("Message requeued for retry", [
|
|
|
+ 'delivery_tag' => $msg->getDeliveryTag(),
|
|
|
+ 'retry_count' => $retryCount + 1
|
|
|
+ ]);
|
|
|
+ } else {
|
|
|
+ // 超过重试次数,拒绝消息(进入死信队列)
|
|
|
+ $msg->nack(false, false);
|
|
|
+ Log::error("Message rejected after max retries", [
|
|
|
+ 'delivery_tag' => $msg->getDeliveryTag(),
|
|
|
+ 'retry_count' => $retryCount
|
|
|
+ ]);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
$iteration++;
|