运行一组链

Ada*_*hue 8 celery

我正在尝试在 Celery 中运行一组链。我创建了一组链:

chains = [celery.chain(a.s(i), b.s(), c.s()) for i in items]
Run Code Online (Sandbox Code Playgroud)

return 将其包装在一个组中:

group = celery.group(*chains)
Run Code Online (Sandbox Code Playgroud)

这里的期望是 Celery 随后将安排每个完整链作为独立任务运行。事实上,从逻辑上讲,这似乎就是正在发生的事情。但有两个问题:

  1. 如果链的数量很大,则似乎没有任何东西可以运行。Celery 或rabbitmq 控制台中没有错误。(是的,使用rabbitmq。)

  2. Celery 似乎会执行组中所有任务中每个链的第一个任务,然后再执行每个链的第二个任务。(也就是说,它似乎将链展开成一组任务a、任务b,然后是任务。它们仍然链接到相应的链条目,但是当某些任务比其他任务完成得更快c时,这会引入延迟a

有什么想法吗?

aer*_*hov 5

一个非常有趣的问题!

我已经编写了代码来使用内存后端和一个进程(位于底部)来测试您的情况。celery -A module-name --loglevel=info -c 10

  1. 类似障碍的行为:这似乎不是问题。如果您应用不同的睡眠,或者以高并行性执行大量任务,您会发现b任务c是并行执行的a

  2. 在大链上失败:当我尝试创建 1000000 条链时,代码实际上在链创建时默默地失败,所以它看起来更像是 python 内存问题。100000长的柴就可以了


代码

from celery import Celery, chain, group
from pprint import pprint
import threading
from time import sleep

app = Celery('chaintext')
app.conf.update(
    BROKER_BACKEND = 'memory',
    CELERY_RESULT_BACKEND = 'cache',
    CELERY_CACHE_BACKEND  = 'memory',
    CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_ENABLE_UTC=True,
    CELERYD_POOL = 'celery.concurrency.threads:TaskPool'
)

@app.task
def a(i):
    result = 'A %s' % i
    sleep((i%3)/ 10.0)
    pprint(result)
    return result


@app.task
def b(self,i):
    result = 'B %s' % i    
    sleep((i%3)/  10.0)
    pprint(result)
    return result

@app.task
def c(self,i):
    result = 'C %s' % i
    sleep((i%3)/  10.0)
    pprint(result)
    return result

def main():
    print "MAIN"
    import time
    time.sleep(5)
    print "STARTING"
    chains = [chain(a.s(i), b.s(i), c.s(i)) for i in range(1000000)]
    print "CREATED CHAINS"
    g = group(*chains)
    print "CREATED GROUP"
    result = g.apply_async()
    print "QUEUED GROUP"

    print result.get()

t1 = threading.Thread(target=main)
t1.start()
Run Code Online (Sandbox Code Playgroud)

  • @Florent 考虑到装饰器没有 @app.task(bind=True) 参数,这只是我的一个错误,我想根本不需要 self (2认同)