|
|
@@ -2,13 +2,20 @@ import logging
|
|
|
import tomllib
|
|
|
|
|
|
import pika
|
|
|
+from redis.cluster import RedisCluster
|
|
|
|
|
|
from .worker import handle_message
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
-def start_consumer(name, queue, config):
|
|
|
+def open_redis_cluster(config):
|
|
|
+ cli = RedisCluster(host=config['host'], port=config['port'])
|
|
|
+ logger.debug("%s", cli.get_nodes())
|
|
|
+ return (cli, config['namespace'])
|
|
|
+
|
|
|
+
|
|
|
+def start_consumer(context, name, queue, config):
|
|
|
connection = pika.BlockingConnection(
|
|
|
pika.ConnectionParameters(
|
|
|
host=config['host'], port=config['port'],
|
|
|
@@ -20,7 +27,8 @@ def start_consumer(name, queue, config):
|
|
|
def callback(ch, method, properties, body):
|
|
|
logger.info("received message(%s,%s)",
|
|
|
properties.message_id, properties.content_type)
|
|
|
- handle_message(properties.message_id, properties.content_type, body)
|
|
|
+ handle_message(context, properties.message_id,
|
|
|
+ properties.content_type, body)
|
|
|
|
|
|
channel.basic_consume(
|
|
|
queue=queue, on_message_callback=callback, auto_ack=True)
|
|
|
@@ -33,4 +41,5 @@ def launch(name, queue, config_file):
|
|
|
logger.debug('load configuration from %s', config_file)
|
|
|
with open(config_file, "rb") as config_fd:
|
|
|
config = tomllib.load(config_fd)
|
|
|
- start_consumer(name, queue, config['rabbitmq'])
|
|
|
+ redis_cli = open_redis_cluster(config['redis'])
|
|
|
+ start_consumer(redis_cli, name, queue, config['rabbitmq'])
|