调用 celery.task.chunks 的正确方法是什么?

Hu *_*Cao 4 python chunks celery

我正在使用 Python3 和 celery 处理可并行的任务。我喜欢将其分成几个块,这样就可以节省网络通信的成本。但是,celery 文档没有透露有关如何调用结果块的足够详细信息。尝试了不同的方法,但它们并没有像我预期的那样工作。我的代码段如下:

@app.task(name='pl.startregret')
def startregret(**kwargs): 
    items = list(zip(range(1000), range(1000)))
    chunk = regretclick.chunks(items, 10)
    print(chunk)
    for c in chunk:
        print(c)


@app.task(name='pl.regretclick')
def regretclick(x,y):
    print('got it.')
    return x + y
Run Code Online (Sandbox Code Playgroud)

我读了一些代码,认为我的代码中的块应该是一个生成器。然而,打印输出显示

[2014-10-15 13:12:15,930: WARNING/Worker-2] args
[2014-10-15 13:12:15,931: WARNING/Worker-2] subtask_type
[2014-10-15 13:12:15,931: WARNING/Worker-2] kwargs
[2014-10-15 13:12:15,931: WARNING/Worker-2] immutable
[2014-10-15 13:12:15,931: WARNING/Worker-2] options
[2014-10-15 13:12:15,931: WARNING/Worker-2] task
Run Code Online (Sandbox Code Playgroud)

关于调用块的正确方法有什么建议吗?

谢谢,

更新:我已阅读源代码并尝试了 chunk()。看起来现在唯一的问题是使用默认队列而不是 celeryconfig 中定义的队列。

Chi*_*and 8

考虑这样一个简单的添加任务。

@app.task()
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

这是调用块任务的简单方法。

res = add.chunks(zip(range(10), range(10)), 2)()
Run Code Online (Sandbox Code Playgroud)

这会将给定的 10 个任务分块为 5 个大小为 2 的任务,并将其添加chunked tasks到默认队列中。如果您想将其路由到不同的队列,则必须在调用任务时指定它。

res = add.chunks(zip(range(10), range(10)), 2).apply_async(queue='my_special_queue')
Run Code Online (Sandbox Code Playgroud)

然后为这个队列启动一个worker来消费任务

worker -A your_app worker -l info -Q my_special_queue
Run Code Online (Sandbox Code Playgroud)