根据芹菜的结果路由到工人?

gak*_*gak 5 python multithreading celery apache-storm

我最近一直在使用Storm,它包含一个名为fields grouping(与group()Celery 中的概念无关)的概念,其中具有特定键的消息将始终路由到同一个 worker。

只是为了更清楚地了解我的意思,这里来自 Storm wiki。

字段分组:流按分组中指定的字段进行分区。例如,如果流按“user-id”字段分组,具有相同“user-id”的元组将始终执行相同的任务,但具有不同“user-id”的元组可能会执行不同的任务.

例如,从单词列表中读取,我想将以 a、b、c 开头的单词路由到仅工作进程、d、e、f 到另一个等。

想要这样做的一个原因可能是因为我希望一个进程负责一组相同数据的数据库读/写,以便进程之间不存在竞争条件。

我正在努力找出在 Celery 中实现这一目标的最佳方法。

到目前为止,我最好的解决方案是为每个“组”(例如字母.a、字母.d)使用一个队列,并确保工作进程的数量与队列的数量完全匹配。缺点是每个 worker 只需要运行一个进程,以及各种场景,比如 worker 死亡,或者 worker 被添加/删除。

我是 Celery 的新手,所以如果我提到的概念不正确,请纠正我。

gak*_*gak 6

涉及到一些胶水,但这是一个概念:

有一种方法可以使用 将任务直接发送给不同的工作人员CELERY_WORKER_DIRECT。将其设置为True创建到每个工作人员的路由。

我通过使用celery.current_app.control.inspect().ping()或确定活动主机定期确定活动工作人员。例如:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']
Run Code Online (Sandbox Code Playgroud)

当我需要通过一个键进行路由时,我会散列该值,然后以工作人员的数量为模。这将平均分配任务,并将相同的密钥保留给同一个工人。例如:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'
Run Code Online (Sandbox Code Playgroud)

然后在执行任务时,我只需像这样指定交换和路由键:

my_task.apply_async(exchange='C.dq', routing_key=host)
Run Code Online (Sandbox Code Playgroud)

有一些缺点:

  1. 从我所见,在工作进程上设置 > 1 的并发将使每个进程消耗相同的资源,从而否定整个练习。不幸的解决方法是将其保持在 1。
  2. 如果 worker 在ping()和之间发生故障,apply_async消息将被发送到不存在的路由。对此的解决方法是捕获超时、重新声明可用主机、重新散列和重新发送。