Celery用一批消息执行任务

tbo*_*tbo 7 python batch-processing celery

我想向芹菜发送消息,当它到达时,让我们说100条消息,我希望芹菜批量执行它们.如果我想批量提交到数据库,这是一种常见的情况.

为了这个目的,谷歌搜索我发现这个链接:用芹菜做批次:http: //celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html

我的问题是,在示例中没有明显的方法来将数据提交给任务

例如,假设我们逐一提交一些消息:

task.apply_async((message,), link_error=error_handler.s())
Run Code Online (Sandbox Code Playgroud)

然后我们有以下任务实现:

@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
   for request in requests:
       print request /// how I can take the message data submitted in my task for process?
Run Code Online (Sandbox Code Playgroud)

有没有其他方法来实现芹菜批次?谢谢

tbo*_*tbo 5

对于那些在经过多次试验和错误后会发现此帖子有用的人,我已设法通过以下方式从SimplRequest对象中获取数据:

使用以下方式提交数据时:

func.delay(data)
Run Code Online (Sandbox Code Playgroud)

从请求对象中获取args属性,该属性是包含数据的列表:

request.args[0]
request.args[1] 
etc.
Run Code Online (Sandbox Code Playgroud)

如果您使用以下方式提交数据:

func.apply_async((), {'data': data}, link_error=error_handler.s())
Run Code Online (Sandbox Code Playgroud)

然后数据以kwargs中的字典形式提供:

request.kwargs['data']
Run Code Online (Sandbox Code Playgroud)

最后,如示例所示,我们需要循环进入收集数据批处理的所有请求

for r in requests:
       data = r.kwargs['data']
Run Code Online (Sandbox Code Playgroud)

如果使用更简单明了的示例更新文档(此处)页面中的示例,那将是很好的