|
|
@@ -13,19 +13,22 @@ logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
def open_redis_cluster(config):
|
|
|
+ logger.debug("open redis cluster tcp://%s:%s/%s",
|
|
|
+ config['host'], config['port'], config['namespace'])
|
|
|
cli = RedisCluster(host=config['host'], port=config['port'])
|
|
|
logger.debug("%s", cli.get_nodes())
|
|
|
return (cli, config['namespace'])
|
|
|
|
|
|
|
|
|
-def start_consumer(context, name, queue, config):
|
|
|
- mq_config = config['rabbitmq']
|
|
|
+def start_consumer(context, name, config, queue, callback):
|
|
|
+ logger.debug("open rabbitmq %s@%s:%d/%s with timeout %ds",
|
|
|
+ config['user'], config['host'], config['port'], config['virtual-host'], config['customer-timeout'])
|
|
|
connection = pika.BlockingConnection(
|
|
|
pika.ConnectionParameters(
|
|
|
- host=mq_config['host'], port=mq_config['port'],
|
|
|
+ host=config['host'], port=config['port'],
|
|
|
credentials=pika.PlainCredentials(
|
|
|
- mq_config['user'], mq_config['password']),
|
|
|
- virtual_host=mq_config['virtual-host']))
|
|
|
+ config['user'], config['password']),
|
|
|
+ virtual_host=config['virtual-host']))
|
|
|
channel = connection.channel()
|
|
|
|
|
|
def callback(ch, method, properties, body):
|
|
|
@@ -34,7 +37,7 @@ def start_consumer(context, name, queue, config):
|
|
|
handle_message(context, ch, method, properties.message_id,
|
|
|
properties.content_type, json.loads(
|
|
|
body, object_hook=SimpleNamespace),
|
|
|
- config['app']['api-url'], config['rabbitmq']['customer-timeout'])
|
|
|
+ callback, ['customer-timeout'])
|
|
|
|
|
|
channel.basic_consume(
|
|
|
queue=queue, on_message_callback=callback, auto_ack=False)
|
|
|
@@ -47,8 +50,7 @@ def launch(name, queue, config_file):
|
|
|
logger.debug('load configuration from %s', config_file)
|
|
|
with open(config_file, "rb") as config_fd:
|
|
|
config = tomllib.load(config_fd)
|
|
|
+ logger.debug('api-url:(%s)', config['app']['api-url'])
|
|
|
redis_cli = open_redis_cluster(config['redis'])
|
|
|
- logger.info('api-url:(%s)', config['app']['api-url'])
|
|
|
- logger.info('customer-timeout:(%s)',
|
|
|
- config['rabbitmq']['customer-timeout'])
|
|
|
- start_consumer(redis_cli, name, queue, config)
|
|
|
+ start_consumer(redis_cli, name,
|
|
|
+ config['rabbitmq'], queue, config['app']['api-url'])
|