使用Celery初始化带有参数的worker

Jos*_*man 9 python celery

我遇到的问题似乎对我来说似乎相对简单.

我正在使用Celery 3.1和Python 3,我想用参数初始化我的worker,以便他们可以使用这些细节进行设置.

具体而言:这些工作人员将消耗需要使用身份验证凭据与第三方API进行交互的任务.在使用任何任务之前,工作人员必须将身份验证详细信息传递给API服务器(身份验证详细信息在第一次身份验证请求后存储在cookie中).

我希望在从CLI启动时将这些登录凭据传递给worker.然后,我希望工作人员使用它们进行身份验证并存储会话以供将来使用时使用(理想情况下,这将存储在可从任务访问的属性中).

这可能与芹菜有关吗?

作为旁注,我考虑将一个requests.session对象(来自Python requests库)作为任务参数传递,但这需要序列化,这看起来是不赞成的.

Pie*_*rre 17

我建议使用一个抽象的任务基类并缓存requests.session.

来自Celery文档:

没有为每个请求实例化任务,但是在任务注册表中将任务注册为全局实例.

这意味着__init__构造函数每个进程只调用一次,并且任务类在语义上更接近Actor.

这对缓存资源也很有用......

import requests
from celery import Task

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

    def __init__(self):
        # since this class is instantiated once, use this method
        # to initialize and cache resources like a requests.session
        # or use a property like the example below which will create
        # a requests.session only the first time it's accessed

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            session.auth = ('user', 'pass')

            self._session = session

        return self._session
Run Code Online (Sandbox Code Playgroud)

现在,当您创建将发出API请求的任务时:

@app.task(base=APITask, bind=True)
def call_api(self, url):
    # self will refer to the task instance (because we're using bind=True)
    self.session.get(url)
Run Code Online (Sandbox Code Playgroud)

您还可以使用app.task装饰器作为额外参数传递API身份验证选项,该参数将__dict__在任务的基础上设置,例如:

# pass a custom auth argument
@app.task(base=APITask, bind=True, auth=('user', 'pass'))
def call_api(self, url):
    pass
Run Code Online (Sandbox Code Playgroud)

并使基类使用传递的身份验证选项:

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

   # the API authentication
   auth = ()

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            # use the authentication that was passed to the task
            session.auth = self.auth

            self._session = session

        return self._session
Run Code Online (Sandbox Code Playgroud)

您可以在Celery docs网站上阅读更多内容:

现在回到原始问题,即从命令行向工作人员传递额外的参数:

在Celery文档中有一节介绍添加新命令行选项,这是从命令行向用户传递用户名和密码的示例:

$ celery worker -A appname --username user --password pass
Run Code Online (Sandbox Code Playgroud)

代码:

from celery import bootsteps
from celery.bin import Option


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.')
)

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.')
)


class CustomArgs(bootsteps.Step):

    def __init__(self, worker, api_username, api_password, **options):
        # store the api authentication
        APITask.auth = (api_username, api_password)


app.steps['worker'].add(CustomArgs)
Run Code Online (Sandbox Code Playgroud)