jb.*_*jb. 6 django celery djcelery
我使用celery来启动看起来像这样的任务集:
我将这些任务的结果汇总到单个答案中,然后用这个答案做一些事情---比如存储到数据库,保存到特殊结果文件等等.基本上在完成任务后我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list):
#store in file
def callback(entity_key, task_result_list):
#store in db
Run Code Online (Sandbox Code Playgroud)现在步骤1.在Celery队列中完成,步骤2在芹菜外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
Run Code Online (Sandbox Code Playgroud)
这种方法很麻烦,因为我必须停止一个线程,直到执行所有任务(可能需要几个小时).
我想以某种方式将步骤2转移到芹菜上 - 基本上我需要为整个任务集添加一个回调(据我所知,它在Celery中不受支持)或提交在所有这些子任务之后执行的任务.
有谁知道怎么做?我在django环境中使用它,所以我可以在数据库中存储一些状态.
我不能直接使用和弦,因为和弦使我能够创建这样的回调:
def callback(task_result_list):
#store in file
Run Code Online (Sandbox Code Playgroud)
没有明显的方法可以将其他参数传递给回调(特别是因为这些回调不能是本地函数).
我可以使用存储结果,TaskSetMeta但是这个实体没有状态字段---所以即使我要向TaskSetMeta添加信号,我也必须汇集任务结果,这可能会产生巨大的开销.
答案非常简单,我确实可以使用和弦 --- 并且附加参数(如报告文件名等)必须作为 kwargs 传递。
这是和弦任务:
@task
def print_and_sum(to_sum, file_name):
print file_name
print sum(to_sum)
return file_name, sum(to_sum)
Run Code Online (Sandbox Code Playgroud)
以下是实例化它的方法:
subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2727 次 |
| 最近记录: |