| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- <?php
- namespace App\Services;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Channel\AMQPChannel;
- use PhpAmqpLib\Message\AMQPMessage;
- use PhpAmqpLib\Wire\AMQPTable;
- use Illuminate\Support\Facades\Log;
- use Illuminate\Support\Str;
- use App\Exceptions\TaskFailException;
- class RabbitMQService
- {
- private $connection;
- private $channel;
- private $config;
- public function __construct()
- {
- $this->config = config('queue.connections.rabbitmq');
- $this->connect();
- }
- private function connect()
- {
- $conn = $this->config;
- $this->connection = new AMQPStreamConnection(
- $conn['host'],
- $conn['port'],
- $conn['user'],
- $conn['password'],
- $conn['virtual_host']
- );
- $this->channel = $this->connection->channel();
- // 设置 QoS - 每次只处理一条消息
- $this->channel->basic_qos(null, 1, null);
- }
- public function getChannel(): AMQPChannel
- {
- return $this->channel;
- }
- public function setupQueue(string $queueName): void
- {
- $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
- $workerArgs = [];
- if (isset($queueConfig['ttl'])) {
- $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
- }
- if (isset($queueConfig['max_length'])) {
- $workerArgs['x-max-length'] = $queueConfig['max_length'];
- }
- // 创建死信交换机
- if (isset($queueConfig['dead_letter_exchange'])) {
- $this->channel->exchange_declare(
- $queueConfig['dead_letter_exchange'],
- 'direct',
- false,
- true,
- false
- );
- $dlqName = $queueConfig['dead_letter_queue'];
- $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
- $dlqArgs = [];
- if (isset($dlqConfig['ttl'])) {
- $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
- }
- if (isset($dlqConfig['max_length'])) {
- $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
- }
- $dlqArguments = new AMQPTable($dlqArgs);
- // 创建死信队列
- $this->channel->queue_declare(
- $dlqName,
- false, // passive
- true, // durable
- false, // exclusive
- false, // auto_delete
- false, // nowait
- $dlqArguments
- );
- // 绑定死信队列到死信交换机
- $this->channel->queue_bind(
- $queueConfig['dead_letter_queue'],
- $queueConfig['dead_letter_exchange']
- );
- // 主队列,配置死信
- $workerArgs['x-dead-letter-exchange'] = $queueConfig['dead_letter_exchange'];
- $workerArgs['x-dead-letter-routing-key'] = $queueConfig['dead_letter_queue'];
- }
- $arguments = new AMQPTable($workerArgs);
- $this->channel->queue_declare(
- $queueName,
- false, // passive
- true, // durable
- false, // exclusive
- false, // auto_delete
- false, // nowait
- $arguments
- );
- /*
- // 检查队列是否存在
- try {
- $this->channel->queue_declare(
- $queueName,
- true, // passive
- true, // durable
- false, // exclusive
- false, // auto_delete
- false, // nowait
- $arguments
- );
- //存在,不用创建
- } catch (\Exception $e) {
- if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
- Log::info("Queue $queueName does not exist.");
- // 队列不存在,创建新队列
- $this->channel->queue_declare(
- $queueName,
- false, // passive
- true, // durable
- false, // exclusive
- false, // auto_delete
- false, // nowait
- $arguments
- );
- } else {
- Log::error("Error checking queue $queueName: {$e->getMessage()}");
- throw $e;
- }
- }
- */
- }
- public function publishMessage(string $queueName, array $data): string
- {
- try {
- $this->setupQueue($queueName);
- $msgId = Str::uuid();
- $message = new AMQPMessage(
- json_encode($data, JSON_UNESCAPED_UNICODE),
- [
- 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
- 'timestamp' => time(),
- 'message_id' => $msgId,
- "content_type" => 'application/json; charset=utf-8'
- ]
- );
- $this->channel->basic_publish($message, '', $queueName);
- Log::info("Message published to queue: {$queueName} msg id={$msgId}");
- return $msgId;
- } catch (\Exception $e) {
- Log::error("Failed to publish message to queue: {$queueName}", [
- 'error' => $e->getMessage(),
- ]);
- throw $e;
- }
- }
- public function consume(string $queueName, callable $callback, int $maxIterations = null): void
- {
- $this->setupQueue($queueName);
- $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
- $iteration = 0;
- $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
- try {
- $data = json_decode($msg->getBody(), true);
- $retryCount = $this->getRetryCount($msg);
- $maxRetries = $this->config['queues'][$queueName]['retry_count'];
- Log::info("Processing message from queue: {$queueName}", [
- 'data' => $data,
- 'retry_count' => $retryCount,
- 'delivery_tag' => $msg->getDeliveryTag()
- ]);
- // 执行回调处理消息
- $callback($data, $retryCount);
- // 处理成功,确认消息
- $msg->ack();
- Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
- } catch (TaskFailException $e) {
- //no need requeue
- Log::error("Error processing message", [
- 'error' => $e->getMessage(),
- 'delivery_tag' => $msg->getDeliveryTag()
- ]);
- $msg->nack(false, false);
- } catch (\Exception $e) {
- // 处理失败,检查重试次数
- if ($retryCount < $maxRetries) {
- // 重新入队,延迟处理
- $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
- Log::warning("Message requeued for retry", [
- 'delivery_tag' => $msg->getDeliveryTag(),
- 'retry_count' => $retryCount + 1
- ]);
- } else {
- // 超过重试次数,拒绝消息(进入死信队列)
- $msg->nack(false, false);
- Log::error("Message rejected after max retries", [
- 'delivery_tag' => $msg->getDeliveryTag(),
- 'retry_count' => $retryCount
- ]);
- }
- }
- $iteration++;
- };
- $this->channel->basic_qos(null, 1, null);
- $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
- Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
- while ($this->channel->is_consuming() && $iteration < $maxIterations) {
- $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
- }
- Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
- }
- private function getRetryCount(AMQPMessage $msg): int
- {
- $headers = $msg->get_properties();
- return isset($headers['application_headers']['x-retry-count'])
- ? $headers['application_headers']['x-retry-count'] : 0;
- }
- private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
- {
- $delay = $this->config['queues'][$queueName]['retry_delay'];
- // 创建延迟队列
- $delayQueueName = "{$queueName}_delay_{$retryCount}";
- $arguments = new AMQPTable([
- 'x-message-ttl' => $delay,
- 'x-dead-letter-exchange' => '',
- 'x-dead-letter-routing-key' => $queueName,
- ]);
- $this->channel->queue_declare(
- $delayQueueName,
- false,
- true,
- false,
- false,
- false,
- $arguments
- );
- // 重新发布消息到延迟队列
- $data = json_decode($msg->getBody(), true);
- $newMessage = new AMQPMessage(
- json_encode($data),
- [
- 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
- 'application_headers' => new AMQPTable([
- 'x-retry-count' => $retryCount
- ])
- ]
- );
- $this->channel->basic_publish($newMessage, '', $delayQueueName);
- $msg->ack();
- }
- public function close(): void
- {
- if ($this->channel) {
- $this->channel->close();
- }
- if ($this->connection) {
- $this->connection->close();
- }
- }
- }
|