|
|
@@ -11,6 +11,7 @@ use PhpAmqpLib\Exception\AMQPTimeoutException;
|
|
|
use PhpAmqpLib\Wire\AMQPTable;
|
|
|
use App\Services\RabbitMQService;
|
|
|
use App\Exceptions\SectionTimeoutException;
|
|
|
+use App\Exceptions\TaskFailException;
|
|
|
|
|
|
class RabbitMQWorker extends Command
|
|
|
{
|
|
|
@@ -146,11 +147,9 @@ class RabbitMQWorker extends Command
|
|
|
|
|
|
try {
|
|
|
// 执行业务逻辑
|
|
|
- $successful = $this->job->handle();
|
|
|
- if ($successful) {
|
|
|
- // 成功处理,确认消息
|
|
|
- $msg->ack();
|
|
|
- }
|
|
|
+ $this->job->handle();
|
|
|
+ // 成功处理,确认消息
|
|
|
+ $msg->ack();
|
|
|
|
|
|
$this->processedCount++;
|
|
|
|
|
|
@@ -158,7 +157,10 @@ class RabbitMQWorker extends Command
|
|
|
} catch (SectionTimeoutException $e) {
|
|
|
$msg->nack(true, false);
|
|
|
Log::warning('attempt to requeue the message message_id:' . $msg->get('message_id'));
|
|
|
+ } catch (TaskFailException $e) {
|
|
|
+ $msg->nack(false, false);
|
|
|
} catch (\Exception $e) {
|
|
|
+ //requeue
|
|
|
$this->handleJobException($msg, $data, $retryCount, $e);
|
|
|
}
|
|
|
} catch (\Exception $e) {
|