对大量 celery 任务进行排队

Mar*_*kus 5 python celery

我正在使用 Celery 分布式任务调度库编写一个 python3 应用程序。工作人员正在使用 greenlet 线程进行处理。该任务是与网络操作相关的 I/O。

我需要将大量芹菜任务作为单个组插入。在本例中,一次大约有 10000 (10k) 个 URL,每个 URL 都是单独的 celery 任务。

像单组这样的插入,在本地主机上运行redis或rabbitmq大约需要12秒。这太长了。

问:有什么方法可以优化使用 celery 的批量插入吗?

在其他线程中,我发现人们正在使用块,但是当我在块中提交它时 - 单个块正在单线程中处理(不使用 greenlet,这是必要的,因为在工作操作上阻塞 IO)。这会导致性能下降。考虑以下数字:

  1. 无块:插入 12 秒,处理 9 秒。
  2. 对于块:插入 3 秒,处理 27 秒。

因此,使用块是不可能的,因为阻塞网络操作将消除 greenlet 线程的性能优势。

soa = open('input.txt').readlines()
for line in soa:
    line = line.strip()
    s = line.split(':')
    l.append(check.s(s[0], s[1]))
    #l.append(s)
t = time.time()

res = check.chunks(l, 10)()
#print(res.get())
print("Submission taken %f" % (time.time() - t))

exit()
Run Code Online (Sandbox Code Playgroud)

块结果:提交耗时 2.251796 秒

l = []

soa = open('input.txt').readlines()

for line in soa:
    line = line.strip()
    s = line.split(':')
    l.append(s)

job = group(l)
t = time.time()
result = job.apply_async()
print("Submission taken %f" % (time.time() - t))
Run Code Online (Sandbox Code Playgroud)

常规结果:提交耗时 12.54412 秒

Ale*_*ers 0

Celery 实际上有一个名为 Group 和 Chunk 的任务包装器。

https://docs.celeryproject.org/en/latest/userguide/canvas.html

我认为,Chunk 需要一个结果后端,但只需将您的任务分成 50 到 200 个 URL 组就可以让 Celery 为您进行优化。

然而,如果您正在执行 10000 个网络绑定任务,那么这将是一个非常棘手的问题。