标签: celery-task

优雅地停止芹菜任务

我想优雅地退出芹菜任务(即不是通过电话revoke(celery_task_id, terminate=True)).我以为我会向设置标志的任务发送消息,以便任务函数可以返回.与任务沟通的最佳方式是什么?

python celery celery-task django-celery

3
推荐指数
2
解决办法
8105
查看次数

Django Celery delay() 总是推送到默认的“celery”队列

我正在用这个扯掉我的头发。

我的问题的症结在于,CELERY_DEFAULT_QUEUE在我的 Django设置中使用settings.py我的任务不会强制我的任务进入我设置的特定队列。它总是进入celery我的经纪人的默认队列。

但是,如果我queue=proj:devshared_task装饰器中指定,它会进入正确的队列。它的行为符合预期。

我的设置如下:

  • 我本地主机上的 Django 代码(用于测试和其他东西)。.delay()通过 Django 的 shell ( manage.py shell)执行任务
  • 配置为我的代理的远程 Redis 实例
  • 在远程机器上配置的 2 名 celery 工作人员正在等待来自 Redis 的消息(在 Google App Engine 上 - 可能无关紧要)

注意:对于下面的代码片段,我隐藏了项目名称并用作proj占位符。

celery.py

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery, shared_task

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

app.config_from_object('django.conf:settings', namespace='CELERY', force=True)

app.autodiscover_tasks()


@shared_task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

settings.py

...
CELERY_RESULT_BACKEND = …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-celery

3
推荐指数
1
解决办法
2381
查看次数

将字符串中的字节发送到 Redis,以便将其用作 Celery 任务的参数

我想使用 Redis 作为代理将字符串中的字节发送到 Celery 的任务,但我收到如下所示的错误:

[2017-06-17 21:27:13,826] ERROR in app: Exception on /endpoint_method [POST]
Traceback (most recent call last):
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/Users/developer/my_project/application.py", line 23, in endpoint_method
    task = my_task.execute.delay(request.data)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/task.py", line 412, in delay
    return self.apply_async(args, …
Run Code Online (Sandbox Code Playgroud)

python redis celery celery-task binascii

3
推荐指数
1
解决办法
1565
查看次数

Celery Worker从哪个目录开始

我需要一些关于 Celery 工人的帮助。我特别无法理解 celery 工作命令需要从哪里(哪个目录)被触发,它背后的概念是什么以及有关导入的一些事情。

\n\n

假设我有以下目录结构:

\n\n
.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entry.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 state1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task2.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task3.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task1.py\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 state2\n    \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n    \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 family1\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n    \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 family2\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 task1.py\n        \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 task2.py\n
Run Code Online (Sandbox Code Playgroud)\n\n

.根目录是当前工作目录,名为project

\n\n

每个taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来都是这样的:

\n\n
from celery import Celery\nfrom celery.result import AsyncResult\nfrom kombu import Queue\n\n_name_ = "project_x"\ncelapp=Celery(backend=\'redis://localhost:6379/0\', broker=\'amqp://a:b@localhost/a_vhost\')\nCELERY_CONFIG = {\n    \'CELERY_DEFAULT_QUEUE\': \'default\',\n …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task celeryd

3
推荐指数
1
解决办法
1万
查看次数

使用 pytest 同时运行 celery 任务

我正在尝试在 Django 应用程序中集成测试并发 celery 任务。我希望该任务在 pytest 集成测试期间实际上在工作人员上同时运行,但我在实现该工作时遇到了困难。

假设我有以下基本芹菜任务:

@shared_task
def sleep_task(secs):
    print(f"Sleeping for {secs} seconds...")
    for i in range(secs):
        time.sleep(i)
        print(f"\t{i + 1}")
    return "DONE"
Run Code Online (Sandbox Code Playgroud)

在脚本中同时运行任务效果很好:

# script
sleep_task.delay(3)
sleep_task.delay(3)

# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
Run Code Online (Sandbox Code Playgroud)

但是,我无法在单元测试中复制这种异步行为。我在其中conftest.py设置了代理和结果后端:

import pytest

pytest_plugins = ("celery.contrib.pytest",)

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "memory://", "result_backend": "rpc://"}
Run Code Online (Sandbox Code Playgroud)

这是我的单元测试。celery_session_appcelery_session_worker设置用于测试的 celery 应用程序和工作人员(文档):

def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
    sleep_task.delay(3)
    sleep_task.delay(3)

# output
Sleeping for …
Run Code Online (Sandbox Code Playgroud)

python concurrency asynchronous celery celery-task

3
推荐指数
1
解决办法
2422
查看次数

在flask的celery文档中,为什么celery任务需要名称?

在文档中,@celery.task 装饰器没有传递参数,但在 GitHub 示例中,它被命名为“tasks.add”。为什么?当我删除名称时,该示例不再有效,抱怨

KeyError: '__main__.add'

[1] http://flask.pocoo.org/docs/0.10/patterns/celery/ [2] https://github.com/thrisp/flask-celery-example/blob/master/app.py#L25

python celery flask celery-task

2
推荐指数
1
解决办法
2369
查看次数

在 Celery 链中使用分组结果

我陷入了相对复杂的芹菜链配置,试图实现以下目标。假设有如下一系列任务:

chain1 = chain(
    DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
    UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
    ParseFile.s(), # parses file, returns list URLs to download
)
Run Code Online (Sandbox Code Playgroud)

现在我想并行下载每个 URL,所以我所做的是:

urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()
Run Code Online (Sandbox Code Playgroud)

最后,我想获取每个下载的文件(从 中返回一个临时文件名DownloadFileParseFile,并通过另一个任务链并行运行它(例如,它将是 a groupof chains):

chains = []
for tmpfile in res:
    chains.append(celery.chain(
        foo.s(tmpfile),
        bar.s(),
        baz.s()
    ))

res2 = celery.group(*chains)()
res2_data = res2.get()
Run Code Online (Sandbox Code Playgroud)

如果我在正常的 …

python celery celery-task

2
推荐指数
1
解决办法
6023
查看次数

气流:如何在多个工作人员上运行任务

我刚刚设置了芹菜执行器的气流,这是我的DAG的骨架

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

我有2个工人.其中一个只运行一个被调用local_queue的队列,另一个运行两个名为的队列local_queue,test_queue

我想在一台机器上运行任务1,但在两台机器上运行任务2和3.即,在仅运行local_queue的worker 1上运行t2和t3,在运行local_queue和test_queue的worker 2上运行所有3(t1,t2和t3).任务运行总数应为5.

但是,当我运行它时,只运行3个任务.1)print_date为worker 2运行(这是正确的)2)print_host仅针对worker 1运行(不正确.应该为两个worker运行)和3)print_uptime仅针对worker 2运行(也是不正确的.应该为两个worker运行)

能否指导我如何设置它以便运行5个任务.在生产中,我想通过将机器分组到队列中以及所有具有QUEUE_A - > do X且所有机器具有QUEUE_B - > do Y等的机器来管理机器.

谢谢

celery celery-task airflow airflow-scheduler

2
推荐指数
1
解决办法
3342
查看次数

如何调用 Celery shared_task?

我正在尝试在我的应用程序(不是 Django)中使用 stream_framework但我在调用 stream_framework 共享任务时遇到问题。Celery 似乎找到了任务:

-------------- celery@M3800 v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-4.15.0-34-generic-x86_64-with-Ubuntu-18.04-bionic
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         task:0x7f8d22176dd8
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . formshare.processes.feeds.tasks.test_shared_task
  . stream_framework.tasks.fanout_operation
  . stream_framework.tasks.fanout_operation_hi_priority
  . stream_framework.tasks.fanout_operation_low_priority
  . stream_framework.tasks.follow_many …
Run Code Online (Sandbox Code Playgroud)

celery celery-task stream-framework

2
推荐指数
1
解决办法
3271
查看次数

如何解决芹菜文件中关键字参数namespace =“'CELERY”错误的芹菜工人配置

我有一个名为ShippingApp的项目,并且按照以下步骤设置了芹菜工人。我将celery 3.1.26.post2与python3.7配合使用,当我想启动Celery Worker时,出现以下错误:

E:\ShippingApp>celery -A ShippingApp worker -l info
Traceback (most recent call last):
  File "c:\program files\python37\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\program files\python37\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Program Files\Python37\Scripts\celery.exe\__main__.py", line 9, in <module>
  File "c:\program files\python37\lib\site-packages\celery\__main__.py", line 30, in main
    main()
  File "c:\program files\python37\lib\site-packages\celery\bin\celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "c:\program files\python37\lib\site-packages\celery\bin\celery.py", line 793, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "c:\program files\python37\lib\site-packages\celery\bin\base.py", line 309, in execute_from_commandline
    argv = self.setup_app_from_commandline(argv)
  File "c:\program files\python37\lib\site-packages\celery\bin\base.py", line 469, in …
Run Code Online (Sandbox Code Playgroud)

python django pycharm celery-task

2
推荐指数
1
解决办法
1848
查看次数