如何使用两个不同的芹菜项目,它消耗来自单个RabbitMQ安装的消息.
通常,如果我使用不同的rabbitmq,这些脚本可以正常工作.但是在生产机器上,我需要为它们共享相同的RabbitMQ后端.
注意:由于一些约束,我无法合并现有的新项目,因此它将是两个不同的项目.
最近,我正在进行GIT项目的实验,以了解大数据处理框架.
1,GIT项目:https://github.com/esperdyne/celery-message-processing
我们有以下组件:
1,AMPQ代理(RabbitMQ):它作为一个消息缓冲区,作为一个邮箱,为不同的用户交换消息!
2,worker:它作为服务服务器为各种服务客户端提供服务.3,Queue("celery":它作为一个多处理容器,用于同时处理各种工作者实例.
关键配置可以看作如下:
我们使用对象proj/celery.py来定义app,定义如下:
app = Celery('proj',
broker='amqp://',
backend='redis://localhost',
include=['proj.tasks'])
Run Code Online (Sandbox Code Playgroud)
在此处输入代码
当我们启动应用程序时:
1,当我们启动应用程序时,我们已经看到了从rabbitmq生成的消息,但是芹菜无法处理消息.
Parse.log看起来像这样:[2017-02-04 14:28:06,909:WARNING/MainProcess]收到并删除了未知消息.错误的目的地?!?
我们有以下问题:
4.2.1 AMQP机制 在这里输入图像描述 我们可以看到AMQP作为消息缓冲区,然后会有一个消息发送者和一个消息提取器:
在上图中,谁是邮件发件人,谁是邮件提取者.
4.2.2消息定义在我们的应用程序中,我们找不到用于定义要发送的消息的代码,或者用于接收AMQP的代码.
4.2.3消息监视器如何监视AMQP中的消息发送和接收.希望老师能指导我们解决问题,并给我们一些详细介绍
关于芹菜经纪人机制的介绍!
注意:错误日志可以在这里看到
[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: [[u'maildir/allen- p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', …Run Code Online (Sandbox Code Playgroud)