| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- import logging
- import tomllib
- import json
- import socket
- import os
- import pika
- from redis.cluster import RedisCluster
- from types import SimpleNamespace
- from .worker import handle_message
- logger = logging.getLogger(__name__)
- def open_redis_cluster(config):
- logger.debug("open redis cluster tcp://%s:%s/%s",
- config['host'], config['port'], config['namespace'])
- cli = RedisCluster(host=config['host'], port=config['port'])
- logger.debug("%s", cli.get_nodes())
- return (cli, config['namespace'])
- def start_consumer(context, name, config, queue, callback, proxy):
- HeartBeat = 3600
- logger.debug("open rabbitmq %s@%s:%d/%s with timeout %ds",
- config['user'], config['host'], config['port'], config['virtual-host'], config['customer-timeout'])
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(
- host=config['host'], port=config['port'],
- heartbeat=HeartBeat,
- credentials=pika.PlainCredentials(
- config['user'], config['password']),
- virtual_host=config['virtual-host']))
- channel = connection.channel()
- def _callback(ch, method, properties, body):
- logger.info("received message(%s,%s)",
- properties.message_id, properties.content_type)
- handle_message(context, ch, method, properties.message_id,
- properties.content_type, json.loads(
- body, object_hook=SimpleNamespace),
- callback, proxy, HeartBeat, name)
- channel.basic_consume(
- queue=queue, on_message_callback=_callback, auto_ack=True)
- name = "%s.%s.%d" % (name, socket.gethostname(), os.getpid())
- logger.info('start a consumer(%s) for queue(%s)', name, queue)
- channel.start_consuming()
- def launch(name, queue, config_file):
- logger.debug('load configuration from %s', config_file)
- with open(config_file, "rb") as config_fd:
- config = tomllib.load(config_fd)
- logger.debug('api-url:(%s)', config['app']['api-url'])
- redis_cli = open_redis_cluster(config['redis'])
- openai_proxy = config['app'].get('openai-proxy-url', None)
- logger.debug(f'openai_proxy:({openai_proxy})')
- start_consumer(redis_cli, name,
- config['rabbitmq'],
- queue, config['app']['api-url'],
- openai_proxy)
|