标签: celery

Celery update_state类中的方法

我是芹菜的新手,并开始了解它是如何工作的。但是我在理解如何从类方法内部更新状态时遇到了一些麻烦。

views.py

@app.route('/',methods=['POST'])
def foo_bar():
    task = foo_async.apply_async()
    return json.dumps({}),202
Run Code Online (Sandbox Code Playgroud)

background_task.py

@celery.task(bind=True)
def foo_async(self):
    t = Test()
    t.run()
    return json.dumps({'progress':100})
Run Code Online (Sandbox Code Playgroud)

test.py

class Test(Task):
    def __init__(self):
        self.foo = 'bar'
    def run(self):
        for i in range(0,10):
            print 'Current : ',i
            self.update_state(state='PROGRESS',meta={'current':i})
            time.sleep(4)
Run Code Online (Sandbox Code Playgroud)

但是我发出请求后收到此错误:

[...] return task_id.replace('-','')  
AttributeError: 'NoneType' object has no attribute 'replace'
Run Code Online (Sandbox Code Playgroud)

因此,我看到问题出在关于ID的问题,但我不知道如何解决。
也许有更好的方法从我的方法中获取更新?

python celery flask

1
推荐指数
1
解决办法
3280
查看次数

Celery Daemon在Centos 7上不起作用

我试图在具有systemd / systemctl的Centos 7上运行celery守护程序。它不起作用。

  • 我尝试了一个非守护程序的案例,它奏效了
  • 我运行了〜mytask,它在客户端计算机和运行celery守护程序的服务器上冻结了,我什么都没有记录。
  • 我注意到实际上没有任何芹菜处理程序正在运行。

有什么建议解决此问题吗?

这是我的守护程序默认配置:

CELERYD_NODES="localhost.localdomain"
CELERY_BIN="/tmp/myapp/venv/bin/celery"
CELERY_APP="pipeline"
CELERYD_OPTS="--broker=amqp://192.168.168.111/"
CELERYD_LOG_LEVEL="INFO"
CELERYD_CHDIR="/tmp/myapp"
CELERYD_USER="root"
Run Code Online (Sandbox Code Playgroud)

注意:我使用以下命令启动守护进程

sudo /etc/init.d/celeryd start
Run Code Online (Sandbox Code Playgroud)

我从以下网址获得了celery守护程序脚本:https : //raw.githubusercontent.com/celery/celery/3.1/extra/generic-init.d/celeryd

我还尝试了以下方法之一:https : //raw.githubusercontent.com/celery/celery/3.1/extra/generic-init.d/celeryd, 但这在尝试启动守护程序时向我显示了一个错误:

systemd[1]: Starting LSB: celery task worker daemon...
celeryd[19924]: basename: missing operand
celeryd[19924]: Try 'basename --help' for more information.
celeryd[19924]: Starting : /etc/rc.d/init.d/celeryd: line 193: multi: command not found
celeryd[19924]: [FAILED]
systemd[1]: celeryd.service: control process exited, code=exited status=1
systemd[1]: Failed to start LSB: celery task worker daemon.
systemd[1]: Unit celeryd.service …
Run Code Online (Sandbox Code Playgroud)

python daemon celery systemd centos7

1
推荐指数
1
解决办法
4795
查看次数

如何通过名称调用Celery任务

我们有一个使用Celery的Python应用程序,RabbitMQ作为代理。将此应用程序视为管理应用程序,仅将消息/任务放入代理​​,而不会对它们进行操作。

将有另一个应用程序(可能基于Python,也可能不基于Python)对消息起作用。

当管理任务的代码库中不存在该任务时,该管理应用程序是否可以将消息/任务放入队列中?如果是这样,我将如何处理?

celery python-2.7

1
推荐指数
3
解决办法
2093
查看次数

芹菜周期性任务不起作用

我试图跟随芹菜doc为django.这是我的项目结构:

??? hiren
?   ??? celery_app.py
?   ??? __init__.py
?   ??? settings.py
?   ??? urls.py
?   ??? wsgi.py
??? manage.py
??? reminder
?   ??? admin.py
?   ??? __init__.py
?   ??? migrations
?   ??? models.py
?   ??? serializers.py
?   |?? task.py
?   |?? tests.py
?   |?? views.py
Run Code Online (Sandbox Code Playgroud)

这是我的settings.py文件:

BROKER_URL = 'redis://localhost:6379/4'
CELERYBEAT_SCHEDULE = {
    'run-every-5-seconds': {
        'task': 'reminder.task.run',
        'schedule': timedelta(seconds=5),
        'args': (16, 16)
    },
}
Run Code Online (Sandbox Code Playgroud)

和reminder/task.py文件:

def run():
    print('hello')
Run Code Online (Sandbox Code Playgroud)

当我运行celery -A hiren beat -l debug命令时,我没有在终端中看到"hello"文本.我错过了什么?

python django celery

1
推荐指数
1
解决办法
2006
查看次数

如何让mac用户在virtualenv下运行redis服务器?

我已经在我的virutalenv中安装celeryredis使用.打字请问我做错了什么?pip install redis celery'djangoscrape'redis-server -bash: redis-server: command not found.

还输入:

/Users/Me/.virtualenvs/djangoscrape/bin/celery --app = scraper.celery_tasks:app worker --lvelvel = INFO

结果是:

-------------- celery@MikkyPro v3.1.18 (Cipater)
---- **** ----- 
--- * ***  * -- Darwin-14.5.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         scraper:0x1084719d0
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     djcelery.backends.database:DatabaseBackend
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** …
Run Code Online (Sandbox Code Playgroud)

python django redis celery django-celery

1
推荐指数
1
解决办法
1853
查看次数

与celery和billiard的python virtualenv ImportError

我正在使用python 2.7.10构建一个新的amazon实例作为默认值.在我运行了我的机器配置脚本并且真相到来之后,芹菜给了我一个导入,所以我调试了问题给billard.

包看起来是正确的路径,即

sudo find -name "billiard"
./srv/ia-live/lib64/python2.7/dist-packages/billiard
Run Code Online (Sandbox Code Playgroud)

其中ia-live是我的virtualenv的道路.检查via python virtualenv可执行文件中的路径

import sys
sys.path

['',
 '/srv/ia-live/bin',
 '/srv/ia-live/src/django-devserver-redux-master',
 '/usr/lib/python2.7',
 '/srv/ia-live/local/lib64/python2.7/site-packages',
 '/srv/ia-live/local/lib/python2.7/site-packages',
 '/srv/ia-live/lib64/python2.7',
 '/srv/ia-live/lib/python2.7',
 '/srv/ia-live/lib64/python2.7/site-packages',
 '/srv/ia-live/lib/python2.7/site-packages',
 '/srv/ia-live/lib64/python2.7/lib-dynload',
 '/srv/ia-live/local/lib/python2.7/dist-packages',
 '/srv/ia-live/local/lib/python2.7/dist-packages',
 '/srv/ia-live/lib/python2.7/dist-packages',
 '/usr/lib64/python2.7',
 '/usr/lib/python2.7',
 '/srv/ia-live/local/lib/python2.7/dist-packages/IPython/extensions',
 '/home/ec2-user/.ipython']
Run Code Online (Sandbox Code Playgroud)

这似乎是正确的,但当我这样做

import billiard
ImportError: No module named billiard
Run Code Online (Sandbox Code Playgroud)

我不明白为什么会有问题

django amazon-ec2 virtualenv celery python-2.7

1
推荐指数
1
解决办法
1013
查看次数

Django 1.9 + Celery未注册的任务

这种配置是正确的.我以错误的方式开始芹菜:(,没有指定项目名称.(芹菜工人-A hockey_manager -l info

我从1.6.5升级到Django 1.9,我不能再让芹菜配置工作了.

经过近两天寻找解决方案后,我没有找到任何工作.

芹菜没有检测到我的任务.我尝试过:

  • CELERY_IMPORTS
  • autodiscover_tasks
  • 绑定=真

依赖

amqp==2.0.3 
celery==3.1.23 
Django==1.9.8 
django-celery==3.1.17
kombu==3.0.35
Run Code Online (Sandbox Code Playgroud)

项目结构

在此输入图像描述

hockey_manager/__ init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Run Code Online (Sandbox Code Playgroud)

hockey_manager/celery.py

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hockey_manager.settings.common')

app = Celery('hockey_manager') …
Run Code Online (Sandbox Code Playgroud)

python django celery django-celery

1
推荐指数
1
解决办法
975
查看次数

何时将Airflow的执行器从LocalExecutor升级到CeleryExecutor?

我目前正在使用LocalExecutor运行多个Airflow DAG,并且运行良好。我的服务器有很多资源。我将为一个更大的项目添加一个新的DAG,并且我正在考虑从LocalExecutor切换到CeleryExecutor。

我的问题是,我应该改用CeleryExecutor有哪些迹象?我是否应该查看特定的性能指标,以了解何时需要开始向外扩展?

celery airflow

1
推荐指数
1
解决办法
1718
查看次数

如何将基于类的任务传递给CELERY_BEAT_SCHEDULE

正如在docs类中看到的那样,任务是表达复杂逻辑的公平方式.

但是,文档没有指定如何将基于闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE(使用django)

我尝试过: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)
Run Code Online (Sandbox Code Playgroud)

- 要么 -

payments/task_summary.py

from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
 run_okpay = OkPayPaymentChecker()
 run_payeer = PayeerPaymentChecker()

CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {

    'task': 'payments.tasks.task_summary.run_okpay',
    'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
    'task': 'payments.task_summary.run_payeer',
    'schedule': timedelta(seconds=60), …
Run Code Online (Sandbox Code Playgroud)

python django celery celerybeat class-based-tasks

1
推荐指数
1
解决办法
2080
查看次数

芹菜一个经纪人多个队列和工人

我有一个调用的python文件tasks.py,其中我定义了4个单独的任务.我想配置芹菜以使用4个队列,因为每个队列将分配不同数量的工作人员.我正在阅读我应该使用route_task属性,但我尝试了几个选项而不是成功.

我正在关注这个doc celery route_tasks docs

我的目标是运行4个工作人员,每个任务一个,不要混合不同队列中不同工作人员的任务.这是可能的?这是一个很好的方法吗?

如果我做错了什么,我很乐意改变我的代码以使其工作

到目前为止,这是我的配置

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('queueA',    routing_key='tasks.task_1'),
    Queue('queueB',    routing_key='tasks.task_2'),
    Queue('queueC',    routing_key='tasks.task_3'),
    Queue('queueD',    routing_key='tasks.task_4')
)


@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"


@app.task
def task_3():
    print "Task of level 3"


@app.task
def task_4():
    print "Task of level 4"
Run Code Online (Sandbox Code Playgroud)

为每个队列运行芹菜一名工人

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq celery

1
推荐指数
1
解决办法
2091
查看次数