one*_*elf 9 python rabbitmq celery
我有一个现有的RabbitMQ部署,一些Java应用程序使用发送日志消息作为各种通道上的字符串JSON对象.我想使用Celery来使用这些消息并将它们写到各个地方(例如DB,Hadoop等).
我可以看到Celery被设计为RabbitMQ消息的生产者和消费者,因为它试图隐藏这些消息的传递机制.反正有没有让Celery消费由另一个应用程序创建的消息并在它们到达时运行作业?
ask*_*sol 13
目前很难将定制消费者添加到芹菜工作者,但这在开发版本中变化(变为3.1),我已经添加了对消费者启动步骤的支持.
由于我刚刚完成它,所以还没有文档,但这是一个例子:
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body, ))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
Run Code Online (Sandbox Code Playgroud)
请注意,API可能会在最终版本中发生变化,我不确定的一件事是如何处理频道get_consumer(connection)
.目前,当连接丢失和关机时,消费者的频道关闭,但人们可能想要手动处理频道.在这种情况下,始终可以自定义ConsumerStep或编写新的StartStopStep.