Python Redis Queue(rq) - 如何避免为每个作业预加载ML模型?

Vil*_*mar 12 python redis python-rq

我想用rq排队我的ml预测.示例代码(pesudo-ish):

predict.py:

import tensorflow as tf

def predict_stuff(foo):
    model = tf.load_model()
    result = model.predict(foo)
    return result
Run Code Online (Sandbox Code Playgroud)

app.py:

from rq import Queue
from redis import Redis
from predict import predict_stuff

q = Queue(connection=Redis())
for foo in baz:
    job = q.enqueue(predict_stuff, foo)
Run Code Online (Sandbox Code Playgroud)

worker.py:

import sys
from rq import Connection, Worker

# Preload libraries
import tensorflow as tf

with Connection():
    qs = sys.argv[1:] or ['default']

    w = Worker(qs)
    w.work()
Run Code Online (Sandbox Code Playgroud)

我已经阅读了rq文档,解释说你可以预加载库以避免每次运行作业时都导入它们(因此在示例代码中我在worker代码中导入tensorflow).但是,我还希望移动模型加载,predict_stuff以避免每次工作人员运行作业时加载模型.我该怎么办呢?

kdo*_*gan 5

我不确定这是否有帮助,但是按照这里的示例:

https://github.com/rq/rq/issues/720

您可以共享模型,而不是共享连接池。

伪代码:

import tensorflow as tf

from rq import Worker as _Worker
from rq.local import LocalStack

_model_stack = LocalStack()

def get_model():
    """Get Model."""
    m = _model_stack.top
    try:
        assert m
    except AssertionError:
        raise('Run outside of worker context')
    return m

class Worker(_Worker):
    """Worker Class."""

    def work(self, burst=False, logging_level='WARN'):
        """Work."""
        _model_stack.push(tf.load_model())
        return super().work(burst, logging_level)

def predict_stuff_job(foo):
    model = get_model()
    result = model.predict(foo)
    return result
Run Code Online (Sandbox Code Playgroud)

我对我编写的“全局”文件阅读器使用类似的东西。将实例加载到 LocalStack 中并让工作人员从堆栈中读取。


Vil*_*mar 3

最后我还没弄清楚如何用 python-rq 来做。我搬到了芹菜,我是这样做的:

app.py

from tasks import predict_stuff

for foo in baz:
    task = predict_stuff.delay(foo)
Run Code Online (Sandbox Code Playgroud)

tasks.py

import tensorflow as tf
from celery import Celery
from celery.signals import worker_process_init

cel_app = Celery('tasks')
model = None

@worker_process_init.connect()
def on_worker_init(**_):
    global model
    model = tf.load_model()

@cel_app.task(name='predict_stuff')
def predict_stuff(foo):
    result = model.predict(foo)
    return result
Run Code Online (Sandbox Code Playgroud)