我是芹菜的新手,并开始了解它是如何工作的。但是我在理解如何从类方法内部更新状态时遇到了一些麻烦。
views.py
@app.route('/',methods=['POST'])
def foo_bar():
task = foo_async.apply_async()
return json.dumps({}),202
Run Code Online (Sandbox Code Playgroud)
background_task.py
@celery.task(bind=True)
def foo_async(self):
t = Test()
t.run()
return json.dumps({'progress':100})
Run Code Online (Sandbox Code Playgroud)
test.py
class Test(Task):
def __init__(self):
self.foo = 'bar'
def run(self):
for i in range(0,10):
print 'Current : ',i
self.update_state(state='PROGRESS',meta={'current':i})
time.sleep(4)
Run Code Online (Sandbox Code Playgroud)
但是我发出请求后收到此错误:
[...] return task_id.replace('-','')
AttributeError: 'NoneType' object has no attribute 'replace'
Run Code Online (Sandbox Code Playgroud)
因此,我看到问题出在关于ID的问题,但我不知道如何解决。
也许有更好的方法从我的方法中获取更新?
我试图在具有systemd / systemctl的Centos 7上运行celery守护程序。它不起作用。
有什么建议解决此问题吗?
这是我的守护程序默认配置:
CELERYD_NODES="localhost.localdomain"
CELERY_BIN="/tmp/myapp/venv/bin/celery"
CELERY_APP="pipeline"
CELERYD_OPTS="--broker=amqp://192.168.168.111/"
CELERYD_LOG_LEVEL="INFO"
CELERYD_CHDIR="/tmp/myapp"
CELERYD_USER="root"
Run Code Online (Sandbox Code Playgroud)
注意:我使用以下命令启动守护进程
sudo /etc/init.d/celeryd start
Run Code Online (Sandbox Code Playgroud)
我从以下网址获得了celery守护程序脚本:https : //raw.githubusercontent.com/celery/celery/3.1/extra/generic-init.d/celeryd
我还尝试了以下方法之一:https : //raw.githubusercontent.com/celery/celery/3.1/extra/generic-init.d/celeryd, 但这在尝试启动守护程序时向我显示了一个错误:
systemd[1]: Starting LSB: celery task worker daemon...
celeryd[19924]: basename: missing operand
celeryd[19924]: Try 'basename --help' for more information.
celeryd[19924]: Starting : /etc/rc.d/init.d/celeryd: line 193: multi: command not found
celeryd[19924]: [FAILED]
systemd[1]: celeryd.service: control process exited, code=exited status=1
systemd[1]: Failed to start LSB: celery task worker daemon.
systemd[1]: Unit celeryd.service …
Run Code Online (Sandbox Code Playgroud) 我们有一个使用Celery的Python应用程序,RabbitMQ作为代理。将此应用程序视为管理应用程序,仅将消息/任务放入代理,而不会对它们进行操作。
将有另一个应用程序(可能基于Python,也可能不基于Python)对消息起作用。
当管理任务的代码库中不存在该任务时,该管理应用程序是否可以将消息/任务放入队列中?如果是这样,我将如何处理?
我试图跟随芹菜doc为django.这是我的项目结构:
??? hiren
? ??? celery_app.py
? ??? __init__.py
? ??? settings.py
? ??? urls.py
? ??? wsgi.py
??? manage.py
??? reminder
? ??? admin.py
? ??? __init__.py
? ??? migrations
? ??? models.py
? ??? serializers.py
? |?? task.py
? |?? tests.py
? |?? views.py
Run Code Online (Sandbox Code Playgroud)
这是我的settings.py文件:
BROKER_URL = 'redis://localhost:6379/4'
CELERYBEAT_SCHEDULE = {
'run-every-5-seconds': {
'task': 'reminder.task.run',
'schedule': timedelta(seconds=5),
'args': (16, 16)
},
}
Run Code Online (Sandbox Code Playgroud)
和reminder/task.py文件:
def run():
print('hello')
Run Code Online (Sandbox Code Playgroud)
当我运行celery -A hiren beat -l debug
命令时,我没有在终端中看到"hello"文本.我错过了什么?
我已经在我的virutalenv中安装celery
和redis
使用.打字请问我做错了什么?pip install redis celery
'djangoscrape'
redis-server
-bash: redis-server: command not found.
还输入:
/Users/Me/.virtualenvs/djangoscrape/bin/celery --app = scraper.celery_tasks:app worker --lvelvel = INFO
结果是:
-------------- celery@MikkyPro v3.1.18 (Cipater)
---- **** -----
--- * *** * -- Darwin-14.5.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: scraper:0x1084719d0
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: djcelery.backends.database:DatabaseBackend
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** …
Run Code Online (Sandbox Code Playgroud) 我正在使用python 2.7.10构建一个新的amazon实例作为默认值.在我运行了我的机器配置脚本并且真相到来之后,芹菜给了我一个导入,所以我调试了问题给billard.
包看起来是正确的路径,即
sudo find -name "billiard"
./srv/ia-live/lib64/python2.7/dist-packages/billiard
Run Code Online (Sandbox Code Playgroud)
其中ia-live是我的virtualenv的道路.检查via python virtualenv可执行文件中的路径
import sys
sys.path
['',
'/srv/ia-live/bin',
'/srv/ia-live/src/django-devserver-redux-master',
'/usr/lib/python2.7',
'/srv/ia-live/local/lib64/python2.7/site-packages',
'/srv/ia-live/local/lib/python2.7/site-packages',
'/srv/ia-live/lib64/python2.7',
'/srv/ia-live/lib/python2.7',
'/srv/ia-live/lib64/python2.7/site-packages',
'/srv/ia-live/lib/python2.7/site-packages',
'/srv/ia-live/lib64/python2.7/lib-dynload',
'/srv/ia-live/local/lib/python2.7/dist-packages',
'/srv/ia-live/local/lib/python2.7/dist-packages',
'/srv/ia-live/lib/python2.7/dist-packages',
'/usr/lib64/python2.7',
'/usr/lib/python2.7',
'/srv/ia-live/local/lib/python2.7/dist-packages/IPython/extensions',
'/home/ec2-user/.ipython']
Run Code Online (Sandbox Code Playgroud)
这似乎是正确的,但当我这样做
import billiard
ImportError: No module named billiard
Run Code Online (Sandbox Code Playgroud)
我不明白为什么会有问题
这种配置是正确的.我以错误的方式开始芹菜:(,没有指定项目名称.(芹菜工人-A hockey_manager -l info
我从1.6.5升级到Django 1.9,我不能再让芹菜配置工作了.
经过近两天寻找解决方案后,我没有找到任何工作.
芹菜没有检测到我的任务.我尝试过:
依赖
amqp==2.0.3
celery==3.1.23
Django==1.9.8
django-celery==3.1.17
kombu==3.0.35
Run Code Online (Sandbox Code Playgroud)
项目结构
hockey_manager/__ init__.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Run Code Online (Sandbox Code Playgroud)
hockey_manager/celery.py
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hockey_manager.settings.common')
app = Celery('hockey_manager') …
Run Code Online (Sandbox Code Playgroud) 我目前正在使用LocalExecutor运行多个Airflow DAG,并且运行良好。我的服务器有很多资源。我将为一个更大的项目添加一个新的DAG,并且我正在考虑从LocalExecutor切换到CeleryExecutor。
我的问题是,我应该改用CeleryExecutor有哪些迹象?我是否应该查看特定的性能指标,以了解何时需要开始向外扩展?
正如在docs类中看到的那样,任务是表达复杂逻辑的公平方式.
但是,文档没有指定如何将基于闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE
(使用django)
我尝试过:
celery.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from payments.tasks.generic.payeer import PayeerPaymentChecker
from payments.tasks.generic.ok_pay import OkPayPaymentChecker
okpay_import = OkPayPaymentChecker()
payeer_imprt = PayeerPaymentChecker()
sender.add_periodic_task(60.0, okpay_import.s(),
name='OkPay import',
expires=30)
sender.add_periodic_task(60.0, payeer_imprt.s(),
name='Payeer import',
expires=30)
Run Code Online (Sandbox Code Playgroud)
- 要么 -
payments/task_summary.py
from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
run_okpay = OkPayPaymentChecker()
run_payeer = PayeerPaymentChecker()
CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {
'task': 'payments.tasks.task_summary.run_okpay',
'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
'task': 'payments.task_summary.run_payeer',
'schedule': timedelta(seconds=60), …
Run Code Online (Sandbox Code Playgroud) 我有一个调用的python文件tasks.py
,其中我定义了4个单独的任务.我想配置芹菜以使用4个队列,因为每个队列将分配不同数量的工作人员.我正在阅读我应该使用route_task属性,但我尝试了几个选项而不是成功.
我正在关注这个doc celery route_tasks docs
我的目标是运行4个工作人员,每个任务一个,不要混合不同队列中不同工作人员的任务.这是可能的?这是一个很好的方法吗?
如果我做错了什么,我很乐意改变我的代码以使其工作
到目前为止,这是我的配置
tasks.py
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA', routing_key='tasks.task_1'),
Queue('queueB', routing_key='tasks.task_2'),
Queue('queueC', routing_key='tasks.task_3'),
Queue('queueD', routing_key='tasks.task_4')
)
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
@app.task
def task_3():
print "Task of level 3"
@app.task
def task_4():
print "Task of level 4"
Run Code Online (Sandbox Code Playgroud)
为每个队列运行芹菜一名工人
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug …
Run Code Online (Sandbox Code Playgroud) celery ×10
python ×7
django ×5
python-2.7 ×2
airflow ×1
amazon-ec2 ×1
celerybeat ×1
centos7 ×1
daemon ×1
flask ×1
rabbitmq ×1
redis ×1
systemd ×1
virtualenv ×1