我正在使用 Celery 分布式任务调度库编写一个 python3 应用程序。工作人员正在使用 greenlet 线程进行处理。该任务是与网络操作相关的 I/O。
我需要将大量芹菜任务作为单个组插入。在本例中,一次大约有 10000 (10k) 个 URL,每个 URL 都是单独的 celery 任务。
像单组这样的插入,在本地主机上运行redis或rabbitmq大约需要12秒。这太长了。
问:有什么方法可以优化使用 celery 的批量插入吗?
在其他线程中,我发现人们正在使用块,但是当我在块中提交它时 - 单个块正在单线程中处理(不使用 greenlet,这是必要的,因为在工作操作上阻塞 IO)。这会导致性能下降。考虑以下数字:
因此,使用块是不可能的,因为阻塞网络操作将消除 greenlet 线程的性能优势。
soa = open('input.txt').readlines()
for line in soa:
line = line.strip()
s = line.split(':')
l.append(check.s(s[0], s[1]))
#l.append(s)
t = time.time()
res = check.chunks(l, 10)()
#print(res.get())
print("Submission taken %f" % (time.time() - t))
exit()
Run Code Online (Sandbox Code Playgroud)
块结果:提交耗时 2.251796 秒
l = []
soa = open('input.txt').readlines()
for line in soa:
line = line.strip()
s = line.split(':')
l.append(s)
job = group(l)
t = time.time()
result = job.apply_async()
print("Submission taken %f" % (time.time() - t))
Run Code Online (Sandbox Code Playgroud)
常规结果:提交耗时 12.54412 秒
Celery 实际上有一个名为 Group 和 Chunk 的任务包装器。
https://docs.celeryproject.org/en/latest/userguide/canvas.html
我认为,Chunk 需要一个结果后端,但只需将您的任务分成 50 到 200 个 URL 组就可以让 Celery 为您进行优化。
然而,如果您正在执行 10000 个网络绑定任务,那么这将是一个非常棘手的问题。