跟踪 celery.group 任务的进度?

Anu*_*har 9 python django celery django-celery

@celery.task
def my_task(my_object):
    do_something_to_my_object(my_object)


#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
group_task = tasks.apply_async()
Run Code Online (Sandbox Code Playgroud)

问:celery 有没有什么东西可以检测一个小组任务的进度?我可以计算有多少任务以及已经处理了多少任务吗?

Anu*_*har 6

在 shell 上修补(ipython 的选项卡自动完成)我发现group_task(这是一个celery.result.ResultSet对象)有一个方法被调用completed_count,它给出了我所需要的。

还可以在http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count找到文档

  • @zerohedge `result = task_group.apply_async()` 不应该阻塞等待结果,它会阻塞直到所有任务都入队,如果你有很多任务,这需要一段时间 (2认同)

Car*_*arl 5

这是基于 @dalore 答案的完整工作示例。

第一的tasks.py

import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')

@app.task(trail=True)
def add(x, y):
    time.sleep(1)
    return x + y

@app.task(trail=True)
def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()
Run Code Online (Sandbox Code Playgroud)

使用 Docker 启动 redis 服务器:docker run --name my-redis -p 6379:6379 -d redis

使用 Docker 启动 RabbitMQ docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine

在单独的 shell 中启动单个进程 celery Worker:celery -A tasks worker --loglevel=info -c 1

然后运行下面的测试脚本。

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):
    results.append(result.get())
print(results)
Run Code Online (Sandbox Code Playgroud)

您应该看到如下所示的内容,进度条每秒增加 10%。

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Run Code Online (Sandbox Code Playgroud)

最后,清理您的 redis 和rabbitmq 容器。

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis
Run Code Online (Sandbox Code Playgroud)