|
|
@@ -47,7 +47,13 @@ class RabbitMQService
|
|
|
{
|
|
|
$queueConfig = config("mint.rabbitmq.queues.{$queueName}");
|
|
|
|
|
|
-
|
|
|
+ $workerArgs = [];
|
|
|
+ if (isset($queueConfig['ttl'])) {
|
|
|
+ $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
|
|
|
+ }
|
|
|
+ if (isset($queueConfig['max_length'])) {
|
|
|
+ $workerArgs['x-max-length'] = $queueConfig['max_length'];
|
|
|
+ }
|
|
|
|
|
|
// 创建死信交换机
|
|
|
if (isset($queueConfig['dead_letter_exchange'])) {
|
|
|
@@ -87,59 +93,68 @@ class RabbitMQService
|
|
|
$queueConfig['dead_letter_exchange']
|
|
|
);
|
|
|
|
|
|
- // 创建主队列,配置死信
|
|
|
- $arguments = new AMQPTable([
|
|
|
- 'x-dead-letter-exchange' => $queueConfig['dead_letter_exchange'],
|
|
|
- 'x-dead-letter-routing-key' => $queueConfig['dead_letter_queue'], // 死信路由键
|
|
|
- ]);
|
|
|
- } else {
|
|
|
- $workerArgs = [];
|
|
|
- if (isset($queueConfig['ttl'])) {
|
|
|
- $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
|
|
|
- }
|
|
|
- if (isset($queueConfig['max_length'])) {
|
|
|
- $workerArgs['x-max-length'] = $queueConfig['max_length'];
|
|
|
- }
|
|
|
- $arguments = new AMQPTable($workerArgs);
|
|
|
+ // 主队列,配置死信
|
|
|
+ $workerArgs['x-dead-letter-exchange'] = $queueConfig['dead_letter_exchange'];
|
|
|
+ $workerArgs['x-dead-letter-routing-key'] = $queueConfig['dead_letter_queue'];
|
|
|
}
|
|
|
+ $arguments = new AMQPTable($workerArgs);
|
|
|
|
|
|
-
|
|
|
- $this->channel->queue_declare(
|
|
|
- $queueName,
|
|
|
- false, // passive
|
|
|
- true, // durable
|
|
|
- false, // exclusive
|
|
|
- false, // auto_delete
|
|
|
- false, // nowait
|
|
|
- $arguments
|
|
|
- );
|
|
|
+ // 检查队列是否存在
|
|
|
+ try {
|
|
|
+ $this->channel->queue_declare(
|
|
|
+ $queueName,
|
|
|
+ true, // passive
|
|
|
+ true, // durable
|
|
|
+ false, // exclusive
|
|
|
+ false, // auto_delete
|
|
|
+ false, // nowait
|
|
|
+ $arguments
|
|
|
+ );
|
|
|
+ //存在,不用创建
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
|
|
|
+ Log::info("Queue $queueName does not exist.");
|
|
|
+ // 队列不存在,创建新队列
|
|
|
+ $this->channel->queue_declare(
|
|
|
+ $queueName,
|
|
|
+ false, // passive
|
|
|
+ true, // durable
|
|
|
+ false, // exclusive
|
|
|
+ false, // auto_delete
|
|
|
+ false, // nowait
|
|
|
+ $arguments
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ Log::error("Error checking queue $queueName: {$e->getMessage()}");
|
|
|
+ throw $e;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public function publishMessage(string $queueName, array $data): bool
|
|
|
{
|
|
|
try {
|
|
|
$this->setupQueue($queueName);
|
|
|
-
|
|
|
+ $msgId = Str::uuid();
|
|
|
$message = new AMQPMessage(
|
|
|
json_encode($data, JSON_UNESCAPED_UNICODE),
|
|
|
[
|
|
|
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
|
|
'timestamp' => time(),
|
|
|
- 'message_id' => Str::uuid(),
|
|
|
+ 'message_id' => $msgId,
|
|
|
"content_type" => 'application/json; charset=utf-8'
|
|
|
]
|
|
|
);
|
|
|
|
|
|
$this->channel->basic_publish($message, '', $queueName);
|
|
|
|
|
|
- Log::info("Message published to queue: {$queueName}", $data);
|
|
|
+ Log::info("Message published to queue: {$queueName} msg id={$msgId}");
|
|
|
return true;
|
|
|
} catch (\Exception $e) {
|
|
|
Log::error("Failed to publish message to queue: {$queueName}", [
|
|
|
'error' => $e->getMessage(),
|
|
|
- 'data' => $data
|
|
|
]);
|
|
|
- return false;
|
|
|
+ throw $e;
|
|
|
}
|
|
|
}
|
|
|
|