我在Celery有一个任务,可以在正常运行的情况下运行10,000秒.但是,我的所有其余任务都应该在不到一秒的时间内完成.如何在不更改短期运行任务的时间限制的情况下为故意长时间运行的任务设置时间限制?
我是celery的新手.我知道如何安装和运行一台服务器,但我需要将任务分配给多台机器.我的项目使用celery将传递给Web框架的用户请求分配给不同的机器,然后返回结果.我阅读了文档,但没有提到如何设置多台机器.我错过了什么?
如果我有一个定义如下的函数:
def add(x,y):
return x+y
Run Code Online (Sandbox Code Playgroud)
有没有办法动态添加这个函数作为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
我还想用类似(伪代码)的东西动态地停止或删除该任务:
celery.beat.remove_task(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
要么
celery.beat.stop(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
仅供参考我没有使用djcelery,它允许您通过django管理员管理定期任务.
Celery默认使用pickle作为任务的序列化方法.如常见问题解答中所述,这代表了一个安全漏洞.Celery允许您使用CELERY_TASK_SERIALIZER配置参数配置任务的序列化方式.
但这并不能解决安全问题.即使使用JSON或类似方法序列化任务,工作人员仍将使用pickle序列化执行插入队列的任务 - 它们只响应content-type消息中的参数.因此,任何可以写入任务队列的人都可以通过编写恶意的pickle对象来有效地显示工作进程.
如何防止工作线程运行使用pickle序列化的任务?
是否可以找出具有特定任务ID的任务是否存在?当我试图获得状态时,我将永远等待.
>>> AsyncResult('...').status
'PENDING'
Run Code Online (Sandbox Code Playgroud)
我想知道给定的任务ID是否是真正的芹菜任务ID而不是随机字符串.我想要不同的结果取决于是否有某个id的有效任务.
过去可能存在具有相同ID的有效任务,但结果可能已从后端删除.
我在这里看过这个例子:
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
我的所有任务都在名为tasks.py的文件中.
更新芹菜并从示例中添加文件django抛出以下错误,无论我尝试什么:
ImportError: cannot import name Celery
Run Code Online (Sandbox Code Playgroud)
问题可能是由以下原因引起的吗?
app.autodiscover_tasks(settings.INSTALLED_APPS, related_name='tasks')
Run Code Online (Sandbox Code Playgroud)
因为它遍历所有具有以下导入的tasks.py文件.
from cloud.celery import app
Run Code Online (Sandbox Code Playgroud)
cloud/celery.py:
from __future__ import absolute_import
import os, sys
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
BROKER_URL = 'redis://:PASSWORD@localhost'
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cloud.settings')
app = Celery('cloud', broker=BROKER_URL)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS, related_name='tasks')
if "test" in sys.argv:
app.conf.update(
CELERY_ALWAYS_EAGER=True,
)
print >> sys.stderr, 'CELERY_ALWAYS_EAGER = True'
CELERYBEAT_SCHEDULE = {
'test_rabbit_running': {
"task": "retail.tasks.test_rabbit_running",
"schedule": 3600, #every hour
},
[..]
app.conf.update(
CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE
)
Run Code Online (Sandbox Code Playgroud)
retail/tasks.py …
在发布新版本以更新工作人员代码的同时,如何优雅地重新启动芹菜工作者?
编辑: 我打算做的是这样的事情.
我正在aws弹性beanstalk上启动一个django应用程序.我想运行后台任务或工作人员以便运行芹菜.
我找不到是否可能.如果是的话怎么可能实现?
这就是我现在正在做的事情,但这每次都会产生一个事件类型错误.
container_commands:
01_syncdb:
command: "django-admin.py syncdb --noinput"
leader_only: true
50_sqs_email:
command: "./manage.py celery worker --loglevel=info"
leader_only: true
Run Code Online (Sandbox Code Playgroud) 考虑到Celery已经是运行任务队列的堆栈的一部分(即它不仅仅是为了运行crons而添加,这似乎是一种矫枉过正的恕我直言).
它的"周期性任务"功能如何作为crontab的替代品有益?专门寻找以下几点.
django-celery,就像排队django任务一样.我正在尝试从以下doc中设置此基本示例:
但到目前为止,我一直得到以下错误:
AttributeError:'Flask'对象没有属性'user_options'
我正在使用芹菜3.1.15.
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
Run Code Online (Sandbox Code Playgroud)
例:
from flask import Flask
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
@celery.task()
def add_together(a, b):
return a + b
Run Code Online (Sandbox Code Playgroud)
回溯错误:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 30, in …Run Code Online (Sandbox Code Playgroud) celery ×10
python ×7
django ×4
celeryd ×2
celerybeat ×1
flask ×1
importerror ×1
security ×1
status ×1
task ×1