|
|
@@ -14,11 +14,13 @@ class RabbitMQService
|
|
|
{
|
|
|
private $connection;
|
|
|
private $channel;
|
|
|
- private $config;
|
|
|
+ private array $config;
|
|
|
+ private array $queues;
|
|
|
|
|
|
public function __construct()
|
|
|
{
|
|
|
$this->config = config('queue.connections.rabbitmq');
|
|
|
+ $this->queues = config('mint.rabbitmq.queues');
|
|
|
$this->connect();
|
|
|
}
|
|
|
|
|
|
@@ -44,6 +46,47 @@ class RabbitMQService
|
|
|
return $this->channel;
|
|
|
}
|
|
|
|
|
|
+ public function createQueue(): array
|
|
|
+ {
|
|
|
+ foreach ($this->queues as $name => $queue) {
|
|
|
+
|
|
|
+ $args = [];
|
|
|
+
|
|
|
+ // TTL
|
|
|
+ if (!empty($queue['ttl'])) {
|
|
|
+ $args['x-message-ttl'] = (int) $queue['ttl'];
|
|
|
+ }
|
|
|
+
|
|
|
+ // max length
|
|
|
+ if (!empty($queue['max_length'])) {
|
|
|
+ $args['x-max-length'] = (int) $queue['max_length'];
|
|
|
+ }
|
|
|
+
|
|
|
+ // dead letter exchange
|
|
|
+ if (!empty($queue['dead_letter_exchange'])) {
|
|
|
+ $args['x-dead-letter-exchange'] = $queue['dead_letter_exchange'];
|
|
|
+ }
|
|
|
+
|
|
|
+ // dead letter routing key(可选但建议)
|
|
|
+ if (!empty($queue['dead_letter_queue'])) {
|
|
|
+ $args['x-dead-letter-routing-key'] = $queue['dead_letter_queue'];
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->channel->queue_declare(
|
|
|
+ $name, // queue 名称
|
|
|
+ false, // passive:检查是否存在(false=不存在则创建)
|
|
|
+ true, // durable:是否持久化(重启 RabbitMQ 后仍存在)
|
|
|
+ false, // exclusive:独占模式
|
|
|
+ false, // auto_delete:是否在无消费者时自动删除
|
|
|
+ false, // nowait:是否不等待服务器响应(false=阻塞等待确认)
|
|
|
+ new AMQPTable($args) // arguments:附加参数(TTL / DLX / max-length 等)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return array_keys($this->queues);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
public function setupQueue(string $queueName): void
|
|
|
{
|
|
|
$queueConfig = config("mint.rabbitmq.queues.{$queueName}");
|