芹菜:从任务中撤销整个链的干净方式

Ale*_*dan 9 django task celery chain

我的问题可能很基本,但我仍然无法在官方文档中找到解决方案.我在Django应用程序中定义了一个Celery链,执行一组依赖于eanch其他的任务:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()
Run Code Online (Sandbox Code Playgroud)

显然,第二个和第三个任务需要父项的输出,这就是我使用链的原因.

现在的问题是:如果第一个任务中的测试条件失败,我需要以编程方式撤销第二个和第三个任务.如何以干净的方式做到这一点?我知道我可以从我定义链的方法中撤销链的任务(参见这个问题和本文),但第一个任务中,我没有后续任务的可见性,也没有链本身的可见性.

临时解决方案

我目前的解决方案是根据前一个任务的结果跳过后续任务中的计算:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...
Run Code Online (Sandbox Code Playgroud)

但这种"解决方法"有一些缺陷:

  • 为每个任务添加不必要的逻辑(基于前任的结果),从而影响重用
  • 仍然执行后续任务,以及所有产生的开销

我没有玩太多将链的引用传递给任务,因为害怕搞乱事情.我也承认我还没有尝试过抛出异常的方法,因为我认为不进行链接的选择可能是一个功能性(因此非特殊)的情况......

谢谢你的帮助!

Ale*_*dan 11

我想我找到这个问题答案:似乎是正确的方法.我想知道为什么这样的常见情况没有记录在任何地方.

为了完整性,我发布了基本代码快照:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....
Run Code Online (Sandbox Code Playgroud)

更新

我实施了一种更优雅的方式来处理这个问题,我想与你分享.我正在使用一个名为的装饰器revoke_chain_authority,这样它就可以自动撤销链而不重写我之前描述的代码.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner
Run Code Online (Sandbox Code Playgroud)

这个装饰器可用于shared task如下:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)
Run Code Online (Sandbox Code Playgroud)

注意使用@wraps.有必要保留原始函数的签名,否则后者将丢失,并且celery会在调用正确的包装任务时弄乱(例如,它将始终调用第一个注册函数而不是正确的函数)


Bru*_*que 9

从Celery 4.0开始,我发现工作的是使用以下语句从当前任务实例的请求中删除剩余的任务:

self.request.chain = None
Run Code Online (Sandbox Code Playgroud)

假设你有一系列任务a.s() | b.s() | c.s().self如果通过将任务bind=True作为参数传递给任务的装饰器来绑定任务,则只能访问任务内的变量.

@app.task(name='main.a', bind=True):
def a(self):
  if something_happened:
    self.request.chain = None
Run Code Online (Sandbox Code Playgroud)

如果something_happened是真的,b并且c不会被执行.