|
|
@@ -68,10 +68,29 @@ class Mq{
|
|
|
/**
|
|
|
* @param \PhpAmqpLib\Message\AMQPMessage $message
|
|
|
*/
|
|
|
- $process_message = function ($message) use($callback)
|
|
|
+ $process_message = function ($message) use($callback,$connection,$exchange,$queue)
|
|
|
{
|
|
|
if($callback !== null){
|
|
|
- $callback(json_decode($message->body));
|
|
|
+ try{
|
|
|
+ $result = $callback(json_decode($message->body));
|
|
|
+ if($result !== 0){
|
|
|
+ throw new \Exception('error');
|
|
|
+ }
|
|
|
+ }catch(\Exception $e){
|
|
|
+ // push to issues
|
|
|
+ $channelName = 'issues';
|
|
|
+ $channelIssues = $connection->channel();
|
|
|
+ $channelIssues->queue_declare($channelName, false, true, false, false);
|
|
|
+
|
|
|
+ $msg = new AMQPMessage(json_encode(['exchange'=>$exchange,
|
|
|
+ 'channel'=>$queue,
|
|
|
+ 'message'=>json_decode($message->body),
|
|
|
+ 'result'=>$result,
|
|
|
+ 'error'=>$e,
|
|
|
+ ],JSON_UNESCAPED_UNICODE));
|
|
|
+ $channelIssues->basic_publish($msg, '', $channelName);
|
|
|
+ $channelIssues->close();
|
|
|
+ }
|
|
|
}
|
|
|
$message->ack();
|
|
|
|