|
|
@@ -6,6 +6,8 @@ use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
use PhpAmqpLib\Exchange\AMQPExchangeType;
|
|
|
use Illuminate\Support\Facades\Log;
|
|
|
+use Illuminate\Support\Str;
|
|
|
+
|
|
|
|
|
|
class Mq
|
|
|
{
|
|
|
@@ -43,7 +45,15 @@ class Mq
|
|
|
$channel = $connection->channel();
|
|
|
$channel->queue_declare($channelName, false, true, false, false);
|
|
|
|
|
|
- $msg = new AMQPMessage(json_encode($message, JSON_UNESCAPED_UNICODE));
|
|
|
+ $msgId = Str::uuid();
|
|
|
+ Log::info('mq push message id=' . $msgId);
|
|
|
+ $msg = new AMQPMessage(
|
|
|
+ json_encode($message, JSON_UNESCAPED_UNICODE),
|
|
|
+ [
|
|
|
+ "message_id" => $msgId,
|
|
|
+ "content_type" => 'application/json; charset=utf-8'
|
|
|
+ ]
|
|
|
+ );
|
|
|
$channel->basic_publish($msg, '', $channelName);
|
|
|
|
|
|
$channel->close();
|
|
|
@@ -104,6 +114,10 @@ class Mq
|
|
|
* @param \PhpAmqpLib\Message\AMQPMessage $message
|
|
|
*/
|
|
|
$process_message = function ($message) use ($callback, $connection, $exchange, $queue) {
|
|
|
+ Log::debug('received message', [
|
|
|
+ 'message_id' => $message->get('message_id'),
|
|
|
+ 'content_type' => $message->get('content_type')
|
|
|
+ ]);
|
|
|
if ($callback !== null) {
|
|
|
try {
|
|
|
$result = $callback(json_decode($message->body));
|
|
|
@@ -132,10 +146,32 @@ class Mq
|
|
|
$channelIssues->close();
|
|
|
}
|
|
|
}
|
|
|
- Log::debug('ack');
|
|
|
- $message->ack();
|
|
|
-
|
|
|
|
|
|
+ $message->ack();
|
|
|
+ Log::debug(
|
|
|
+ 'mq done',
|
|
|
+ [
|
|
|
+ 'message_id' => $message->get('message_id')
|
|
|
+ ]
|
|
|
+ );
|
|
|
+
|
|
|
+ //kill me
|
|
|
+ foreach (config('mint.mq.loop_limit') as $key => $value) {
|
|
|
+ if ($queue === $key) {
|
|
|
+ if ($value > 0) {
|
|
|
+ if (isset($GLOBALS[$key])) {
|
|
|
+ $GLOBALS[$key]++;
|
|
|
+ } else {
|
|
|
+ $GLOBALS[$key] = 1;
|
|
|
+ }
|
|
|
+ if ($GLOBALS[$key] >= $value) {
|
|
|
+ //kill me
|
|
|
+ Log::debug('mq kill loop=' . $GLOBALS[$key]);
|
|
|
+ $message->getChannel()->basic_cancel($message->getConsumerTag());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
// Send a message with the string "quit" to cancel the consumer.
|
|
|
/*
|
|
|
if ($message->body === 'quit') {
|