nin*_*ser 11 python rabbitmq celery celery-task
乍一看,我非常喜欢Celery中的"批量"功能,因为我需要在调用API之前对一定数量的ID进行分组(否则我可能会被踢掉).
不幸的是,在稍微测试一下时,批处理任务似乎与其他Canvas原语(在本例中为链)不能很好地兼容.例如:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
Run Code Online (Sandbox Code Playgroud)
因此,通过这个简单的工作流程
chain(get_price.s("ID_1"), completed.si()).delay()
Run Code Online (Sandbox Code Playgroud)
我看到这个输出:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
Run Code Online (Sandbox Code Playgroud)
5秒后,filter_by_price()会像预期的那样被触发.问题是completed()永远不会被调用.
有什么想法可以在这里发生?如果不使用批次,什么可能是一个解决这个问题的好方法?
PS:我已经CELERYD_PREFETCH_MULTIPLIER=0
像文档说的那样设置了.
看起来批处理任务的行为与正常任务有很大不同.批处理任务甚至不会发出像task_success这样的信号.
由于您需要在之后调用completed
任务get_price
,您可以直接从get_price
自身调用它.
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
# do something
completed.delay()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
613 次 |
最近记录: |