Celery并行分布式任务与多处理

Pro*_*eus 66 python django multithreading multiprocessing celery

我有一个CPU密集型Celery任务.我想在许多EC2实例中使用所有处理能力(核心)来更快地完成这项工作(我认为芹菜并行分布式多任务处理任务).

术语,线程,多处理,分布式计算,分布式并行处理都是我试图更好理解的术语.

示例任务:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()
Run Code Online (Sandbox Code Playgroud)

使用上面的代码(如果可能的话,有一个例子)如何使用Celery分配这个任务,允许利用云中所有可用机器的所有计算CPU功率来分离这一任务?

dan*_*ano 111

你的目标是:

  1. 将您的工作分配给许多机器(分布式计算/分布式并行处理)
  2. 在所有CPU上分配给定计算机上的工作(多处理/线程)

芹菜可以很容易地为你做这两件事.首先要了解的是,默认情况下,每个芹菜工作者都配置为运行与系统上可用的CPU核心一样多的任务:

并发性是用于同时处理任务的prefork工作进程的数量,当所有这些进程忙于完成工作时,新任务必须等待其中一个任务完成才能处理.

默认并发数是该计算机上的CPU数(包括核心数),您可以使用-c选项指定自定义数.没有建议的值,因为最佳数量取决于许多因素,但如果您的任务主要是I/O限制,那么您可以尝试增加它,实验表明添加超过CPU数量的两倍是很少的有效,而且可能会降低性能.

这意味着每个单独的任务都不需要担心使用多处理/线程来使用多个CPU /核心.相反,芹菜将同时运行足够的任务来使用每个可用的CPU.

有了这个,下一步就是创建一个处理你的某些子集的任务list_of_millions_of_ids.这里有几个选项 - 一个是让每个任务处理一个ID,所以你运行N个任务,在哪里N == len(list_of_millions_of_ids).这将保证工作在所有任务中均匀分配,因为永远不会有一个工人提前结束而只是等待的情况; 如果它需要工作,它可以从队列中拉出一个id.您可以使用芹菜这样做(如John Doe所述)group.

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()
Run Code Online (Sandbox Code Playgroud)

并执行任务:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
Run Code Online (Sandbox Code Playgroud)

另一个选择是将列表分成更小的部分,并将这些部分分发给您的工作人员.这种方法存在浪费一些周期的风险,因为最终可能会有一些工人在等待,而其他人仍在工作.然而,芹菜文件指出,这种担忧往往是没有根据的:

有些人可能会担心,对任务进行分块会导致并行性降低,但对于繁忙的群集而言,这种情况很少发生,实际上,由于避免了消息传递的开销,因此可能会大大提高性能.

因此,您可能会发现,由于减少了消息传递开销,对列表进行分块并将块分发到每个任务的性能会更好.你可以通过这种方式减轻数据库的负担,通过计算每个id,将其存储在列表中,然后在完成后将整个列表添加到DB中,而不是一次只执行一个id .分块方法看起来像这样

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
Run Code Online (Sandbox Code Playgroud)

并开始任务:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
Run Code Online (Sandbox Code Playgroud)

你可以尝试一下块大小给你最好的结果.你想找到一个最佳位置,你可以减少消息开销,同时保持足够小的尺寸,使你不会让工人比其他工人更快地完成他们的大块,然后只是等待无所事事.

  • @Spike不太好.目前编写的任务只能使用一个核心.为了使单个任务使用多个核心,我们将引入`threading`或`multiprocessing`.而不是这样做,我们让每个芹菜工人产生与机器上可用核心一样多的任务(这在芹菜中默认发生).这意味着在整个集群中,每个核心都可以用来处理你的`list_of_million_ids`,每个任务都使用一个核心.因此,我们不是让一个任务使用多个核心,而是使用一个核心来完成许多任务.那有意义吗? (3认同)

tk.*_*tk. 11

在分销世界中,首先应该记住的只有一件事:

过早优化是万恶之源.由D. Knuth

我知道这听起来很明显但在分发双重检查之前你使用的是最好的算法(如果它存在......).话虽如此,优化分配是三件事之间的平衡:

  1. 从持久性媒体中写入/读取数据,
  2. 将数据从中A移动到中B,
  3. 处理数据,

计算机的制作越接近您的处理单元(3),速度越快,效率越高(1)和(2).经典集群中的订单将是:网络硬盘驱动器,本地硬盘驱动器,RAM,内部处理单元领域......现在处理器变得越来越复杂,被认为是通常称为核心的独立硬件处理单元的集合,这些核心处理数据(3)通过线程(2).想象一下,你的核心是如此之快,以至于当你使用一个线程发送数据时,你正在使用50%的计算机能力,如果核心有2个线程,那么你将使用100%.每个核心两个线程称为超线程,您的操作系统每个超线程核心将看到2个CPU.

管理处理器中的线程通常称为多线程.从OS管理CPU通常称为多处理.管理集群中的并发任务通常称为并行编程.管理集群中的依赖任务通常称为分布式编程.

那么你的瓶颈在哪里?

  • 在(1)中:尝试从较高级别(靠近处理单元的那个级别,例如,如果网络硬盘驱动器缓慢首先保存在本地硬盘驱动器中)保持并流式传输
  • 在(2)中:这是最常见的一种,尽量避免分发不需要的通信包或"即时"压缩包(例如,如果HD很慢,只保存"批量计算"消息并保持RAM中的中间结果.
  • 在(3)中:你完成了!您正在使用所有处理能力.

芹菜怎么样?

Celery是分布式编程的消息传递框架,它将使用代理模块进行通信(2),使用后端模块进行持久化(1),这意味着您可以通过更改配置来避免大多数瓶颈(如果可能)您的网络,只在您的网络上.首先分析您的代码,以在单台计算机上实现最佳性能.然后使用默认配置在集群中使用celery并设置CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result
Run Code Online (Sandbox Code Playgroud)

在执行期间打开您最喜欢的监控工具,我使用RabbitMQ的默认值和芹菜花的顶部和cpus的顶部,您的结果将保存在您的后端.网络瓶颈的一个例子是任务队列增长太多以至于延迟执行,您可以继续更改模块或芹菜配置,如果不是您的瓶颈在其他地方.


Let*_*t4U 8

为什么不group为此使用芹菜任务?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

基本上,你应该ids分成块(或范围)并给它们一堆任务group.

对于更复杂的smth,比如聚合特定芹菜任务的结果,我已经成功地将chord任务用于类似目的:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

增加settings.CELERYD_CONCURRENCY的数量是合理的,你能负担得起,那么这些工人芹菜将继续执行群组或弦在您的任务,直至完成.

注意:由于kombu过去在重复使用大量工作时出现问题,我不知道现在是否已修复.也许是,但如果没有,减少CELERYD_MAX_TASKS_PER_CHILD.

基于我运行的简化和修改代码的示例:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
Run Code Online (Sandbox Code Playgroud)

summarize获得所有single_batch_processor任务的结果.每个任务都在任何Celery工作者上运行,kombu协调它.

现在我明白了:single_batch_processorsummarize还必须芹菜任务,不规律的功能-否则当然也不会被并行化(我不即使它不是一个芹菜任务肯定和弦构造函数会接受它).