我有一个很大的 csv 文件,我将其拆分为一个包含 100000 行的块列表,将每个块传递给一个函数来进行复杂的计算,并将结果附加到 global_list 中。当最后一个块完成时,我使用 global_list 并对其进行一些统计。我怎样才能让 celery 并行处理所有块,但要等到最后一个任务/最后一个块完成后再在 global_list 上执行函数 complex_calc?
感谢您的帮助
for chunk in global_chunk_list:
def func_calc.delay(chunk) #<<<<< use celery tasks
complex_calc(global_list) #<<<<< should only start when processing last chunk is finished
@celery.task(name='func_calc')
def func_calc(chunk):
...
#save chunk in a global list
global_list.append(result)
def complex_calc(global_list):
...
Run Code Online (Sandbox Code Playgroud)
合适的方法是使用Groups和join方法等待一组并行任务完成执行。
task_group = group([func_calc.s(chunk) for chunk in global_chunk_list])
result_group = task_group.apply_async()
results = result_group.join() # wait for all results
Run Code Online (Sandbox Code Playgroud)
另请参阅文档中的示例。(一个区别是使用join而不是get,它等待任务完成)另见这个答案。
>>> from celery import group
>>> from tasks import add
>>> job = group([
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready() # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]
Run Code Online (Sandbox Code Playgroud)
要有效地执行此操作,您需要配置一个结果后端。
| 归档时间: |
|
| 查看次数: |
3835 次 |
| 最近记录: |