最近,我正在进行GIT项目的实验,以了解大数据处理框架.
1,GIT项目:https://github.com/esperdyne/celery-message-processing
我们有以下组件:
1,AMPQ代理(RabbitMQ):它作为一个消息缓冲区,作为一个邮箱,为不同的用户交换消息!
2,worker:它作为服务服务器为各种服务客户端提供服务.3,Queue("celery":它作为一个多处理容器,用于同时处理各种工作者实例.
关键配置可以看作如下:
我们使用对象proj/celery.py来定义app,定义如下:
app = Celery('proj',
broker='amqp://',
backend='redis://localhost',
include=['proj.tasks'])
Run Code Online (Sandbox Code Playgroud)
在此处输入代码
当我们启动应用程序时:
1,当我们启动应用程序时,我们已经看到了从rabbitmq生成的消息,但是芹菜无法处理消息.
Parse.log看起来像这样:[2017-02-04 14:28:06,909:WARNING/MainProcess]收到并删除了未知消息.错误的目的地?!?
我们有以下问题:
4.2.1 AMQP机制 在这里输入图像描述 我们可以看到AMQP作为消息缓冲区,然后会有一个消息发送者和一个消息提取器:
在上图中,谁是邮件发件人,谁是邮件提取者.
4.2.2消息定义在我们的应用程序中,我们找不到用于定义要发送的消息的代码,或者用于接收AMQP的代码.
4.2.3消息监视器如何监视AMQP中的消息发送和接收.希望老师能指导我们解决问题,并给我们一些详细介绍
关于芹菜经纪人机制的介绍!
注意:错误日志可以在这里看到
[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: [[u'maildir/allen- p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': 'gen17347@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}
[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete
[2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready.
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': 'gen19722@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}
enter code here
Run Code Online (Sandbox Code Playgroud)
Ani*_*nis 11
给出你正在使用的芹菜和librabbitmq的版本会很有帮助.由于我有一个非常类似的问题,我猜你正在使用芹菜4.0.2和librabbitmq 1.6.1.
在这种情况下,这是一个已知的兼容性问题,您可以参考https://github.com/celery/celery/issues/3675和https://github.com/celery/librabbitmq/issues/93.
第一个链接为您提供解决问题的建议,即:
卸载librabbitmq
pip uninstall librabbitmq
(您可能需要多次调用此命令)
改变的出现amqp给pyamqp你borker网址.(如果你使用的话,不在配置文件中.这样做对我来说不起作用).
更准确地回答你的其他问题:你说得对,有一个寄件人和一个收件人.
发件人角色由您致电时创建的应用程序承担Celery(...).它的一个作用是充当任务的注册表,如果你在app/base.py中查看它的实现,你会发现它实现了一个send_task由apply_asyncTask类的方法直接调用的方法.此方法的作用是通过连接到代理发送任务的编组版本,以便工作人员可以获取它.用于传输消息的应用程序协议是amqp,其实现是librabbitmq.
在电线的另一侧,还有另一个实例,由工作人员启动进行取物工作.用芹菜的说法,它被称为a Consumer.您可以在worker/consumer/consumer.py中找到它的实现.您将看到它实现了一个create_task_handler依次定义on_task_received引发您看到的错误的函数.它是从工作程序中获取新任务时被调用的函数,然后是已处理的新任务.
因此,建议的解决方案在于更改amqp协议的实现,以便TypeError不会引发a on_task_received(在我看来,这似乎是由编码问题引起的).
我希望它能回答您的所有问题,让您更清楚地了解芹菜的工作原理.最后我应该说,据我所知,Celery的"常规"使用永远不会要求你篡改那些内部,并且你可以通过实现自定义任务类和自定义后端来实现99%的需求. .
| 归档时间: |
|
| 查看次数: |
2408 次 |
| 最近记录: |