我想优雅地退出芹菜任务(即不是通过电话revoke(celery_task_id, terminate=True)).我以为我会向设置标志的任务发送消息,以便任务函数可以返回.与任务沟通的最佳方式是什么?
我正在用这个扯掉我的头发。
我的问题的症结在于,CELERY_DEFAULT_QUEUE在我的 Django设置中使用settings.py我的任务不会强制我的任务进入我设置的特定队列。它总是进入celery我的经纪人的默认队列。
但是,如果我queue=proj:dev在shared_task装饰器中指定,它会进入正确的队列。它的行为符合预期。
我的设置如下:
.delay()通过 Django 的 shell ( manage.py shell)执行任务注意:对于下面的代码片段,我隐藏了项目名称并用作proj占位符。
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, shared_task
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY', force=True)
app.autodiscover_tasks()
@shared_task
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
settings.py
...
CELERY_RESULT_BACKEND = …Run Code Online (Sandbox Code Playgroud) 我想使用 Redis 作为代理将字符串中的字节发送到 Celery 的任务,但我收到如下所示的错误:
[2017-06-17 21:27:13,826] ERROR in app: Exception on /endpoint_method [POST]
Traceback (most recent call last):
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/Users/developer/my_project/application.py", line 23, in endpoint_method
task = my_task.execute.delay(request.data)
File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/task.py", line 412, in delay
return self.apply_async(args, …Run Code Online (Sandbox Code Playgroud) 我需要一些关于 Celery 工人的帮助。我特别无法理解 celery 工作命令需要从哪里(哪个目录)被触发,它背后的概念是什么以及有关导入的一些事情。
\n\n假设我有以下目录结构:
\n\n.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 state1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task2.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task3.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 state2\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\nRun Code Online (Sandbox Code Playgroud)\n\n.根目录是当前工作目录,名为project
每个taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来都是这样的:
\n\nfrom celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n \'CELERY_DEFAULT_QUEUE\': \'default\',\n …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 Django 应用程序中集成测试并发 celery 任务。我希望该任务在 pytest 集成测试期间实际上在工作人员上同时运行,但我在实现该工作时遇到了困难。
假设我有以下基本芹菜任务:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE"
Run Code Online (Sandbox Code Playgroud)
在脚本中同时运行任务效果很好:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
Run Code Online (Sandbox Code Playgroud)
但是,我无法在单元测试中复制这种异步行为。我在其中conftest.py设置了代理和结果后端:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://", "result_backend": "rpc://"}
Run Code Online (Sandbox Code Playgroud)
这是我的单元测试。celery_session_app并celery_session_worker设置用于测试的 celery 应用程序和工作人员(文档):
def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for …Run Code Online (Sandbox Code Playgroud) 在文档中,@celery.task 装饰器没有传递参数,但在 GitHub 示例中,它被命名为“tasks.add”。为什么?当我删除名称时,该示例不再有效,抱怨
KeyError: '__main__.add'
[1] http://flask.pocoo.org/docs/0.10/patterns/celery/ [2] https://github.com/thrisp/flask-celery-example/blob/master/app.py#L25
我陷入了相对复杂的芹菜链配置,试图实现以下目标。假设有如下一系列任务:
chain1 = chain(
DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
ParseFile.s(), # parses file, returns list URLs to download
)
Run Code Online (Sandbox Code Playgroud)
现在我想并行下载每个 URL,所以我所做的是:
urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()
Run Code Online (Sandbox Code Playgroud)
最后,我想获取每个下载的文件(从 中返回一个临时文件名DownloadFile)ParseFile,并通过另一个任务链并行运行它(例如,它将是 a groupof chains):
chains = []
for tmpfile in res:
chains.append(celery.chain(
foo.s(tmpfile),
bar.s(),
baz.s()
))
res2 = celery.group(*chains)()
res2_data = res2.get()
Run Code Online (Sandbox Code Playgroud)
如果我在正常的 …
我刚刚设置了芹菜执行器的气流,这是我的DAG的骨架
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
我有2个工人.其中一个只运行一个被调用local_queue的队列,另一个运行两个名为的队列local_queue,test_queue
我想在一台机器上运行任务1,但在两台机器上运行任务2和3.即,在仅运行local_queue的worker 1上运行t2和t3,在运行local_queue和test_queue的worker 2上运行所有3(t1,t2和t3).任务运行总数应为5.
但是,当我运行它时,只运行3个任务.1)print_date为worker 2运行(这是正确的)2)print_host仅针对worker 1运行(不正确.应该为两个worker运行)和3)print_uptime仅针对worker 2运行(也是不正确的.应该为两个worker运行)
能否指导我如何设置它以便运行5个任务.在生产中,我想通过将机器分组到队列中以及所有具有QUEUE_A - > do X且所有机器具有QUEUE_B - > do Y等的机器来管理机器.
谢谢
我正在尝试在我的应用程序(不是 Django)中使用 stream_framework但我在调用 stream_framework 共享任务时遇到问题。Celery 似乎找到了任务:
-------------- celery@M3800 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Linux-4.15.0-34-generic-x86_64-with-Ubuntu-18.04-bionic
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: task:0x7f8d22176dd8
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. formshare.processes.feeds.tasks.test_shared_task
. stream_framework.tasks.fanout_operation
. stream_framework.tasks.fanout_operation_hi_priority
. stream_framework.tasks.fanout_operation_low_priority
. stream_framework.tasks.follow_many …Run Code Online (Sandbox Code Playgroud) 我有一个名为ShippingApp的项目,并且按照以下步骤设置了芹菜工人。我将celery 3.1.26.post2与python3.7配合使用,当我想启动Celery Worker时,出现以下错误:
E:\ShippingApp>celery -A ShippingApp worker -l info
Traceback (most recent call last):
File "c:\program files\python37\lib\runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "c:\program files\python37\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\Program Files\Python37\Scripts\celery.exe\__main__.py", line 9, in <module>
File "c:\program files\python37\lib\site-packages\celery\__main__.py", line 30, in main
main()
File "c:\program files\python37\lib\site-packages\celery\bin\celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "c:\program files\python37\lib\site-packages\celery\bin\celery.py", line 793, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "c:\program files\python37\lib\site-packages\celery\bin\base.py", line 309, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "c:\program files\python37\lib\site-packages\celery\bin\base.py", line 469, in …Run Code Online (Sandbox Code Playgroud) celery-task ×10
celery ×9
python ×8
django ×2
airflow ×1
asynchronous ×1
binascii ×1
celeryd ×1
concurrency ×1
flask ×1
pycharm ×1
redis ×1