Celery:运行冗长初始化函数的正确方法(每个进程)

bas*_*kum 11 python initialization multiprocessing python-c-extension celery

TLDR;

要为celery生成的每个进程运行初始化函数,您可以使用该worker_process_init信号.正如您可以在文档中看到的那样,该信号的处理程序不应该阻塞超过4秒.但是有什么选择,如果我必须运行一个执行时间超过4秒的init函数?

问题

我使用C扩展模块在芹菜任务中运行某些操作.此模块需要初始化,可能需要几秒钟(可能是4-10).因为我宁愿不为每个任务运行这个init函数,但对于每个生成的进程,我都使用了这个worker_process_init信号:

#lib.py 
import isclient #c extension module
client = None
def init():
    global client
    client = isclient.Client() #this might take a while

def create_ne_list(text):
    return client.ne_receiventities4datachunk(text)

#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init

celery = Celery(include=[
    'isc.ne.tasks'
])

celery.config_from_object('celeryconfig')

@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
    init()

if __name__ == '__main__':
    celery.start()

#tasks.py
from celery import celery
from lib import create_ne_list as cnl

@celery.task(time_limit=1200)
def create_ne_list(text):
    return cnl(text)
Run Code Online (Sandbox Code Playgroud)

当我运行这段代码时会发生什么,这是我在之前的问题中所描述的(Celery:无限期地重复超时(等待UP消息超时)).简而言之:由于我的init函数需要超过4秒,因此有时会发生工作程序被杀死并重新启动,并且在重新启动过程中会再次被杀死,因为这是4秒无响应后自动发生的情况.这最终导致无限重复的杀戮和重启过程.

另一个选择是使用信号为每个工作人员仅运行一次init函数worker_init.如果我这样做,我会遇到一个不同的问题:现在排队的进程由于某种原因而卡住了.当我以并发3开始工作,然后发送几个任务时,前三个将完成,其余的将不会被触及.(我假设它可能与事实有关,client需要在多个进程之间共享对象,并且C扩展由于某些原因不支持它.但说实话,我相对较新多处理,所以我可以猜测)

所以,问题仍然存在:如何在每个进程中运行一个超过4秒的init函数?有没有正确的方法来做到这一点,那会是什么样的方式?

cha*_*wan 8

Celery将init超时限制为4.0秒.检查源代码

要解决此限制,您可以考虑在创建芹菜应用程序之前对其进行更改

from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough
Run Code Online (Sandbox Code Playgroud)

请注意,没有用于更改此值的配置或设置.


Zev*_*ert 7

从 celery 4.4.0 开始, @changhwan答案不再是唯一的方法。这是添加此功能的配置选项的拉取请求。

使用配置选项

对于 celery ^4.4.0,该值是可配置的。使用 celery 应用程序配置选项worker_proc_alive_timeout。来自稳定版本文档

worker_proc_alive_timeout

默认值:4.0。

等待新工作进程启动时的超时时间(以秒为单位)(int/float)。

例子:

from celery import Celery
from celery.signals import worker_process_init

app = Celery('app')
app.conf.worker_proc_alive_timeout = 10

@worker_process_init.connect
def long_init_function(*args, **kwargs):
   import time
   time.sleep(8)
Run Code Online (Sandbox Code Playgroud)