tbo*_*tbo 7 python batch-processing celery
我想向芹菜发送消息,当它到达时,让我们说100条消息,我希望芹菜批量执行它们.如果我想批量提交到数据库,这是一种常见的情况.
为了这个目的,谷歌搜索我发现这个链接:用芹菜做批次:http: //celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html
我的问题是,在示例中没有明显的方法来将数据提交给任务
例如,假设我们逐一提交一些消息:
task.apply_async((message,), link_error=error_handler.s())
Run Code Online (Sandbox Code Playgroud)
然后我们有以下任务实现:
@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
for request in requests:
print request /// how I can take the message data submitted in my task for process?
Run Code Online (Sandbox Code Playgroud)
有没有其他方法来实现芹菜批次?谢谢
对于那些在经过多次试验和错误后会发现此帖子有用的人,我已设法通过以下方式从SimplRequest对象中获取数据:
使用以下方式提交数据时:
func.delay(data)
Run Code Online (Sandbox Code Playgroud)
从请求对象中获取args属性,该属性是包含数据的列表:
request.args[0]
request.args[1]
etc.
Run Code Online (Sandbox Code Playgroud)
如果您使用以下方式提交数据:
func.apply_async((), {'data': data}, link_error=error_handler.s())
Run Code Online (Sandbox Code Playgroud)
然后数据以kwargs中的字典形式提供:
request.kwargs['data']
Run Code Online (Sandbox Code Playgroud)
最后,如示例所示,我们需要循环进入收集数据批处理的所有请求
for r in requests:
data = r.kwargs['data']
Run Code Online (Sandbox Code Playgroud)
如果使用更简单明了的示例更新文档(此处)页面中的示例,那将是很好的
| 归档时间: |
|
| 查看次数: |
1589 次 |
| 最近记录: |