Blu*_*ber 15 python redis celery
我正在使用带有Redis后端的Celery 3.1.9.我正在运行的工作由几个以和弦和链条运行的子任务组成.结构如下所示:
列表中的每个项目都是一个子任务,它们都链接在一起.第2步和第4步是和弦.通过为第4步创建和弦来连接整个事情,其中回调是4 - > 6的链,然后为第2步创建和弦,其回调是3 - >第一和弦.然后,最后创建一个链 - >第二个和弦.然后使用delay()启动该链,并将其ID存储在数据库中.
问题是双重的,首先我希望能够撤销整个事情,其次我希望在我的Task类上有一个自定义的on_failure来进行一些清理,并向用户报告失败.
目前我存储了链的任务ID,我想我可以用它来撤销链.此外,如果出现错误,我想将链路连接到它的根(在on_failure处理程序中)以从数据库中检索相关记录.这不起作用,因为当您仅使用任务的ID重新创建AsyncResult实例时,它的父属性为None.
我尝试的第二件事是存储在外链的结果上调用的serializable()的结果.但是,这不返回整个AsyncResult对象树,它只返回链中第一级的ID(因此不是和弦中子项的ID.)
我尝试的第三件事就是基本上自己实现serializable(),但事实证明,原始方法不会超过2级的原因是因为链的子节点是celery.canvas.chord对象,而不是AsyncResult实例.
问题的例证:
chord([
foo.si(),
foo.si(),
foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())
Run Code Online (Sandbox Code Playgroud)
打印以下内容:
(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
[(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
(('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
(('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
None)
Run Code Online (Sandbox Code Playgroud)
第一个ID是回调链的id,第二个ID来自和弦任务本身,后三个是和弦内的实际任务.但是我无法从回调链中的任务得到结果(即两个bar.si()调用的ID).
有没有办法获得实际的任务ID?
一种巧妙的方法是使用 apply_async 调用任务,保存任务 ID 并手动等待它们。通过这种方式,您将完全控制发生的事情,但您应该只等待异步任务作为最后的手段。现在您可以访问任务 ID、返回值等。例如:
task1 = a_task.apply_async()
task2 = b_task.apply_async()
task3 = c_task.apply_async()
tasks = [task1, task2, task3]
for task in tasks:
task.wait()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1131 次 |
| 最近记录: |