Celery Worker从哪个目录开始

qre*_*0ct 3 python celery celery-task celeryd

我需要一些关于 Celery 工人的帮助。我特别无法理解 celery 工作命令需要从哪里(哪个目录)被触发,它背后的概念是什么以及有关导入的一些事情。

\n\n

假设我有以下目录结构:

\n\n
.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 state1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task2.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task3.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 state2\n    \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n    \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n    \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n        \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n
Run Code Online (Sandbox Code Playgroud)\n\n

.根目录是当前工作目录,名为project

\n\n

每个taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来都是这样的:

\n\n
from celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n    \'CELERY_DEFAULT_QUEUE\': \'default\',\n    \'CELERY_QUEUES\': (Queue(\'q1\'), Queue(\'q2\'),),\n    \'CELERY_TASK_SERIALIZER\': \'pickle\',\n    \'CELERY_ACCEPT_CONTENT\': [\'json\',\'pickle\']\n}\n\ncelapp.conf.update(**CELERY_CONFIG)\n\n@celapp.task()\ndef t1():\n    print("starting task")\n    time.sleep(5)\n    print("Finished task")\n
Run Code Online (Sandbox Code Playgroud)\n\n

以下为内容entry.py

\n\n
import json\nfrom flask_cors import CORS\nfrom flask import Flask, Response, render_template\nfrom flask import request, jsonify, redirect\nfrom functools import wraps\n<what would be the import statement to import all the tasks>\n\n_name_ = "project_x"\napp     = Flask(_name_)\n\n@app.route("/api1", methods=[\'POST\'])\ndef api1():\n    req = request.jsonify\n    if not req:\n        return jsonify(success=False, msg="Missing request parameters", code="1")\n    else:\n        param1 = req.get(\'p1\')\n        param2 = req.get(\'p2\')\n        tId = startTask()\n        return jsonify(success="True", msg="All Good", taskId=tId)\n\n\ndef startTask():\n    tId = "abcd123"\n    created_task = state1.family1.task1.subtask(queue=\'q1\')\n    created_task.delay()\n    return tId\n\n\nif __name__ == \'__main__\':\n    app.run(debug=True, host="192.168.1.7", port="4444")\n
Run Code Online (Sandbox Code Playgroud)\n\n

Entry.py 是 Fl​​ask 应用程序,其中 api1 将被触发,然后根据我想要启动特定任务的参数。

\n\n

现在我的问题是:

\n\n
    \n
  1. entry.py导入文件中所有任务的导入语句是什么
  2. \n
  3. 我从哪里开始工作。我的意思是我应该从哪个目录启动Celery -A <directory name> worker -l info命令,为什么?
  4. \n
  5. 在许多示例中,我看到任务和 CeleryApp 文件之间存在明显的分离。有人可以建议什么是更好的方式来安排我的任务和芹菜配置等,以及上述两个问题如何与这个新提出的结构相一致?
  6. \n
\n

Pat*_*cio 5

好的,希望这会有所帮助。我会按照你的要求反向回答。

\n\n
\n

在许多示例中,我发现任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议\n 更好的方式来安排我的任务和芹菜配置等吗?\n 上述两个问题如何与这个新提出的结构保持一致?

\n
\n\n

我在您添加的片段中看到的第一个问题是,taskn.py您拥有的每个片段都有自己的celery. 您需要在每个之间共享此实例taskn.py。\n我建议创建一个celery_app.py

\n\n
my_app\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 celery_app.py\n\xe2\x94\x82   \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 ...\n
Run Code Online (Sandbox Code Playgroud)\n\n

在此文件中,您将创建 celery 实例

\n\n
from celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n    \'CELERY_DEFAULT_QUEUE\': \'default\',\n    \'CELERY_QUEUES\': (Queue(\'q1\'), Queue(\'q2\'),),\n    \'CELERY_TASK_SERIALIZER\': \'pickle\',\n    \'CELERY_ACCEPT_CONTENT\': [\'json\',\'pickle\']\n}\n\ncelapp.conf.update(**CELERY_CONFIG)\ncelery_app.conf.imports = [\n    \'state1.family1.task1\',\n    \'my_app.state1.family1.task2\',  # Or Maybe\n    ...\n]\n
Run Code Online (Sandbox Code Playgroud)\n\n

然后在每个taskn.py你可以导入这个实例,并且每个任务将被注册在同一个芹菜应用程序下

\n\n
from my_app.celery_app import celapp\n\n@celapp.task()\ndef t1():\n    print("starting task")\n    time.sleep(5)\n    print("Finished task")\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n

我从哪里开始工作。我的意思是我应该从哪个目录启动 Celery -A worker -l info 命令,为什么?

\n
\n\n

然后你应该很容易调用Celery -A my_app.celery_app worker -l info,因为你的 celery 实例将位于模块 my_app,子模块 celery_app 中

\n\n
\n

导入entry.py中所有任务的导入语句是什么

\n
\n\n

最后,entry.py您可以执行import state1.family1.task1 import t1并调用t1.delay()任何已注册的任务。

\n