__init__.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import logging
  2. import tomllib
  3. import json
  4. import socket
  5. import os
  6. import pika
  7. from redis.cluster import RedisCluster
  8. from types import SimpleNamespace
  9. from .worker import handle_message
  10. logger = logging.getLogger(__name__)
  11. def open_redis_cluster(config):
  12. logger.debug("open redis cluster tcp://%s:%s/%s",
  13. config['host'], config['port'], config['namespace'])
  14. cli = RedisCluster(host=config['host'], port=config['port'])
  15. logger.debug("%s", cli.get_nodes())
  16. return (cli, config['namespace'])
  17. def start_consumer(context, name, config, queue, callback, proxy):
  18. HeartBeat = 3600
  19. logger.debug("open rabbitmq %s@%s:%d/%s with timeout %ds",
  20. config['user'], config['host'], config['port'], config['virtual-host'], config['customer-timeout'])
  21. connection = pika.BlockingConnection(
  22. pika.ConnectionParameters(
  23. host=config['host'], port=config['port'],
  24. heartbeat=HeartBeat,
  25. credentials=pika.PlainCredentials(
  26. config['user'], config['password']),
  27. virtual_host=config['virtual-host']))
  28. channel = connection.channel()
  29. def _callback(ch, method, properties, body):
  30. logger.info("received message(%s,%s)",
  31. properties.message_id, properties.content_type)
  32. handle_message(context, ch, method, properties.message_id,
  33. properties.content_type, json.loads(
  34. body, object_hook=SimpleNamespace),
  35. callback, proxy, HeartBeat, name)
  36. channel.basic_consume(
  37. queue=queue, on_message_callback=_callback, auto_ack=True)
  38. name = "%s.%s.%d" % (name, socket.gethostname(), os.getpid())
  39. logger.info('start a consumer(%s) for queue(%s)', name, queue)
  40. channel.start_consuming()
  41. def launch(name, queue, config_file):
  42. logger.debug('load configuration from %s', config_file)
  43. with open(config_file, "rb") as config_fd:
  44. config = tomllib.load(config_fd)
  45. logger.debug('api-url:(%s)', config['app']['api-url'])
  46. redis_cli = open_redis_cluster(config['redis'])
  47. openai_proxy = config['app'].get('openai-proxy-url', None)
  48. logger.debug(f'openai_proxy:({openai_proxy})')
  49. start_consumer(redis_cli, name,
  50. config['rabbitmq'],
  51. queue, config['app']['api-url'],
  52. openai_proxy)