|
@@ -1,8 +1,9 @@
|
|
|
import logging
|
|
import logging
|
|
|
import tomllib
|
|
import tomllib
|
|
|
-
|
|
|
|
|
|
|
+import json
|
|
|
import pika
|
|
import pika
|
|
|
from redis.cluster import RedisCluster
|
|
from redis.cluster import RedisCluster
|
|
|
|
|
+from types import SimpleNamespace
|
|
|
|
|
|
|
|
from .worker import handle_message
|
|
from .worker import handle_message
|
|
|
|
|
|
|
@@ -16,22 +17,25 @@ def open_redis_cluster(config):
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_consumer(context, name, queue, config):
|
|
def start_consumer(context, name, queue, config):
|
|
|
|
|
+ mq_config = config['rabbitmq']
|
|
|
connection = pika.BlockingConnection(
|
|
connection = pika.BlockingConnection(
|
|
|
pika.ConnectionParameters(
|
|
pika.ConnectionParameters(
|
|
|
- host=config['host'], port=config['port'],
|
|
|
|
|
|
|
+ host=mq_config['host'], port=mq_config['port'],
|
|
|
credentials=pika.PlainCredentials(
|
|
credentials=pika.PlainCredentials(
|
|
|
- config['user'], config['password']),
|
|
|
|
|
- virtual_host=config['virtual-host']))
|
|
|
|
|
|
|
+ mq_config['user'], mq_config['password']),
|
|
|
|
|
+ virtual_host=mq_config['virtual-host']))
|
|
|
channel = connection.channel()
|
|
channel = connection.channel()
|
|
|
|
|
|
|
|
def callback(ch, method, properties, body):
|
|
def callback(ch, method, properties, body):
|
|
|
logger.info("received message(%s,%s)",
|
|
logger.info("received message(%s,%s)",
|
|
|
properties.message_id, properties.content_type)
|
|
properties.message_id, properties.content_type)
|
|
|
- handle_message(context, properties.message_id,
|
|
|
|
|
- properties.content_type, body)
|
|
|
|
|
|
|
+ handle_message(context, ch, method, properties.message_id,
|
|
|
|
|
+ properties.content_type, json.loads(
|
|
|
|
|
+ body, object_hook=SimpleNamespace),
|
|
|
|
|
+ config['app']['api-url'], config['rabbitmq']['customer-timeout'])
|
|
|
|
|
|
|
|
channel.basic_consume(
|
|
channel.basic_consume(
|
|
|
- queue=queue, on_message_callback=callback, auto_ack=True)
|
|
|
|
|
|
|
+ queue=queue, on_message_callback=callback, auto_ack=False)
|
|
|
|
|
|
|
|
logger.info('start a consumer(%s) for queue(%s)', name, queue)
|
|
logger.info('start a consumer(%s) for queue(%s)', name, queue)
|
|
|
channel.start_consuming()
|
|
channel.start_consuming()
|
|
@@ -42,4 +46,7 @@ def launch(name, queue, config_file):
|
|
|
with open(config_file, "rb") as config_fd:
|
|
with open(config_file, "rb") as config_fd:
|
|
|
config = tomllib.load(config_fd)
|
|
config = tomllib.load(config_fd)
|
|
|
redis_cli = open_redis_cluster(config['redis'])
|
|
redis_cli = open_redis_cluster(config['redis'])
|
|
|
- start_consumer(redis_cli, name, queue, config['rabbitmq'])
|
|
|
|
|
|
|
+ logger.info('api-url:(%s)', config['app']['api-url'])
|
|
|
|
|
+ logger.info('customer-timeout:(%s)',
|
|
|
|
|
+ config['rabbitmq']['customer-timeout'])
|
|
|
|
|
+ start_consumer(redis_cli, name, queue, config)
|