qre*_*0ct 3 python celery celery-task celeryd
我需要一些关于 Celery 工人的帮助。我特别无法理解 celery 工作命令需要从哪里(哪个目录)被触发,它背后的概念是什么以及有关导入的一些事情。
\n\n假设我有以下目录结构:
\n\n.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 state1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task2.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task3.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 state2\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\nRun Code Online (Sandbox Code Playgroud)\n\n.根目录是当前工作目录,名为project
每个taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来都是这样的:
\n\nfrom celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n \'CELERY_DEFAULT_QUEUE\': \'default\',\n \'CELERY_QUEUES\': (Queue(\'q1\'), Queue(\'q2\'),),\n \'CELERY_TASK_SERIALIZER\': \'pickle\',\n \'CELERY_ACCEPT_CONTENT\': [\'json\',\'pickle\']\n}\n\ncelapp.conf.update(**CELERY_CONFIG)\n\n@celapp.task()\ndef t1():\n print("starting task")\n time.sleep(5)\n print("Finished task")\nRun Code Online (Sandbox Code Playgroud)\n\n以下为内容entry.py:
import json\nfrom flask_cors import CORS\nfrom flask import Flask, Response, render_template\nfrom flask import request, jsonify, redirect\nfrom functools import wraps\n<what would be the import statement to import all the tasks>\n\n_name_ = "project_x"\napp = Flask(_name_)\n\n@app.route("/api1", methods=[\'POST\'])\ndef api1():\n req = request.jsonify\n if not req:\n return jsonify(success=False, msg="Missing request parameters", code="1")\n else:\n param1 = req.get(\'p1\')\n param2 = req.get(\'p2\')\n tId = startTask()\n return jsonify(success="True", msg="All Good", taskId=tId)\n\n\ndef startTask():\n tId = "abcd123"\n created_task = state1.family1.task1.subtask(queue=\'q1\')\n created_task.delay()\n return tId\n\n\nif __name__ == \'__main__\':\n app.run(debug=True, host="192.168.1.7", port="4444")\nRun Code Online (Sandbox Code Playgroud)\n\nEntry.py 是 Flask 应用程序,其中 api1 将被触发,然后根据我想要启动特定任务的参数。
\n\n现在我的问题是:
\n\nentry.py导入文件中所有任务的导入语句是什么Celery -A <directory name> worker -l info命令,为什么?好的,希望这会有所帮助。我会按照你的要求反向回答。
\n\n\n\n\n在许多示例中,我发现任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议\n 更好的方式来安排我的任务和芹菜配置等吗?\n 上述两个问题如何与这个新提出的结构保持一致?
\n
我在您添加的片段中看到的第一个问题是,taskn.py您拥有的每个片段都有自己的celery. 您需要在每个之间共享此实例taskn.py。\n我建议创建一个celery_app.py
my_app\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 celery_app.py\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 ...\nRun Code Online (Sandbox Code Playgroud)\n\n在此文件中,您将创建 celery 实例
\n\nfrom celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n \'CELERY_DEFAULT_QUEUE\': \'default\',\n \'CELERY_QUEUES\': (Queue(\'q1\'), Queue(\'q2\'),),\n \'CELERY_TASK_SERIALIZER\': \'pickle\',\n \'CELERY_ACCEPT_CONTENT\': [\'json\',\'pickle\']\n}\n\ncelapp.conf.update(**CELERY_CONFIG)\ncelery_app.conf.imports = [\n \'state1.family1.task1\',\n \'my_app.state1.family1.task2\', # Or Maybe\n ...\n]\nRun Code Online (Sandbox Code Playgroud)\n\n然后在每个taskn.py你可以导入这个实例,并且每个任务将被注册在同一个芹菜应用程序下
from my_app.celery_app import celapp\n\n@celapp.task()\ndef t1():\n print("starting task")\n time.sleep(5)\n print("Finished task")\nRun Code Online (Sandbox Code Playgroud)\n\n\n\n\n我从哪里开始工作。我的意思是我应该从哪个目录启动 Celery -A worker -l info 命令,为什么?
\n
然后你应该很容易调用Celery -A my_app.celery_app worker -l info,因为你的 celery 实例将位于模块 my_app,子模块 celery_app 中
\n\n\n导入entry.py中所有任务的导入语句是什么
\n
最后,entry.py您可以执行import state1.family1.task1 import t1并调用t1.delay()任何已注册的任务。
| 归档时间: |
|
| 查看次数: |
11458 次 |
| 最近记录: |