将Celery与现有RabbitMQ消息一起使用

one*_*elf 9 python rabbitmq celery

我有一个现有的RabbitMQ部署,一些Java应用程序使用发送日志消息作为各种通道上的字符串JSON对象.我想使用Celery来使用这些消息并将它们写到各个地方(例如DB,Hadoop等).

我可以看到Celery被设计为RabbitMQ消息的生产者和消费者,因为它试图隐藏这些消息的传递机制.反正有没有让Celery消费由另一个应用程序创建的消息并在它们到达时运行作业?

ask*_*sol 13

目前很难将定制消费者添加到芹菜工作者,但这在开发版本中变化(变为3.1),我已经添加了对消费者启动步骤的支持.

由于我刚刚完成它,所以还没有文档,但这是一个例子:

from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue

class CustomConsumer(ConsumerStep):
   queue = Queue('custom', Exchange('custom'), routing_key='custom')

   def __init__(self, c, enable_custom_consumer=False, **kwargs):
       self.enable = self.enable_custom_consumer

   def get_consumers(self, connection):
       return [
           Consumer(connection.channel(),
               queues=[self.queue],
               callbacks=[self.on_message]),
       ]

   def on_message(self, body, message):
       print('GOT MESSAGE: %r' % (body, ))
       message.ack()


celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true',
           help='Enable our custom consumer.'),
)
Run Code Online (Sandbox Code Playgroud)

请注意,API可能会在最终版本中发生变化,我不确定的一件事是如何处理频道get_consumer(connection).目前,当连接丢失和关机时,消费者的频道关闭,但人们可能想要手动处理频道.在这种情况下,始终可以自定义ConsumerStep或编写新的StartStopStep.

  • 现在可以在http://celery.readthedocs.org/en/latest/userguide/extending.html找到该文档. (3认同)