标签: celery

用芹菜设置特定任务的时间限制

我在Celery有一个任务,可以在正常运行的情况下运行10,000秒.但是,我的所有其余任务都应该在不到一秒的时间内完成.如何在不更改短期运行任务的时间限制的情况下为故意长时间运行的任务设置时间限制?

celery celeryd

45
推荐指数
2
解决办法
3万
查看次数

如何在不同的机器上安装芹菜工人?

我是celery的新手.我知道如何安装和运行一台服务器,但我需要将任务分配给多台机器.我的项目使用celery将传递给Web框架的用户请求分配给不同的机器,然后返回结果.我阅读了文档,但没有提到如何设置多台机器.我错过了什么?

python celery

44
推荐指数
1
解决办法
2万
查看次数

如何动态添加/删除周期性任务到芹菜(celerybeat)

如果我有一个定义如下的函数:

def add(x,y):
  return x+y
Run Code Online (Sandbox Code Playgroud)

有没有办法动态添加这个函数作为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

我还想用类似(伪代码)的东西动态地停止或删除该任务:

celery.beat.remove_task(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

要么

celery.beat.stop(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

仅供参考我没有使用djcelery,它允许您通过django管理员管理定期任务.

python celery celerybeat

43
推荐指数
4
解决办法
2万
查看次数

如何禁止芹菜中的泡菜系列化

Celery默认使用pickle作为任务的序列化方法.如常见问题解答中所述,这代表了一个安全漏洞.Celery允许您使用CELERY_TASK_SERIALIZER配置参数配置任务的序列化方式.

但这并不能解决安全问题.即使使用JSON或类似方法序列化任务,工作人员仍将使用pickle序列化执行插入队列的任务 - 它们只响应content-type消息中的参数.因此,任何可以写入任务队列的人都可以通过编写恶意的pickle对象来有效地显示工作进程.

如何防止工作线程运行使用pickle序列化的任务?

python security celery

41
推荐指数
3
解决办法
1万
查看次数

了解芹菜任务是否存在

是否可以找出具有特定任务ID的任务是否存在?当我试图获得状态时,我将永远等待.

>>> AsyncResult('...').status
'PENDING'
Run Code Online (Sandbox Code Playgroud)

我想知道给定的任务ID是否是真正的芹菜任务ID而不是随机字符串.我想要不同的结果取决于是否有某个id的有效任务.

过去可能存在具有相同ID的有效任务,但结果可能已从后端删除.

python status task celery

41
推荐指数
2
解决办法
2万
查看次数

Django,ImportError:无法导入名称Celery,可能循环导入?

我在这里看过这个例子:

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

我的所有任务都在名为tasks.py的文件中.

更新芹菜并从示例中添加文件django抛出以下错误,无论我尝试什么:

ImportError: cannot import name Celery
Run Code Online (Sandbox Code Playgroud)

问题可能是由以下原因引起的吗?

app.autodiscover_tasks(settings.INSTALLED_APPS, related_name='tasks')
Run Code Online (Sandbox Code Playgroud)

因为它遍历所有具有以下导入的tasks.py文件.

from cloud.celery import app
Run Code Online (Sandbox Code Playgroud)

cloud/celery.py:

from __future__ import absolute_import

import os, sys
from celery import Celery
from celery.schedules import crontab
from django.conf import settings

BROKER_URL = 'redis://:PASSWORD@localhost'

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cloud.settings')
app = Celery('cloud', broker=BROKER_URL)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS, related_name='tasks')

if "test" in sys.argv:
    app.conf.update(
        CELERY_ALWAYS_EAGER=True,
    )
    print >> sys.stderr, 'CELERY_ALWAYS_EAGER = True'

CELERYBEAT_SCHEDULE = {
    'test_rabbit_running': {
        "task": "retail.tasks.test_rabbit_running",
        "schedule": 3600, #every hour
    },
    [..]

app.conf.update(
    CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE
)
Run Code Online (Sandbox Code Playgroud)

retail/tasks.py …

python django importerror celery

41
推荐指数
8
解决办法
4万
查看次数

如何优雅地重新启动芹菜工人?

在发布新版本以更新工作人员代码的同时,如何优雅地重新启动芹菜工作者?

编辑: 我打算做的是这样的事情.

  • 工作正在运行,可能会将一个100 MB的文件上传到S3
  • 一个新的构建来了
  • 工作人员代码有变化
  • 构建脚本向工作人员发出信号
  • 使用新代码启动新工作人员
  • 在完成现有工作退出后获得信号的工人.

django celery celeryd

40
推荐指数
4
解决办法
4万
查看次数

如何使用AWS Elastic Beanstalk运行工作人员?

我正在aws弹性beanstalk上启动一个django应用程序.我想运行后台任务或工作人员以便运行芹菜.

我找不到是否可能.如果是的话怎么可能实现?

这就是我现在正在做的事情,但这每次都会产生一个事件类型错误.

container_commands:
  01_syncdb:
    command: "django-admin.py syncdb --noinput"
    leader_only: true
  50_sqs_email:
    command: "./manage.py celery worker --loglevel=info"
    leader_only: true
Run Code Online (Sandbox Code Playgroud)

django amazon-web-services celery amazon-elastic-beanstalk

40
推荐指数
2
解决办法
1万
查看次数

为什么使用Celery运行计划任务优于crontab?

考虑到Celery已经是运行任务队列的堆栈的一部分(即它不仅仅是为了运行crons而添加,这似乎是一种矫枉过正的恕我直言).

它的"周期性任务"功能如何作为crontab的替代品有益?专门寻找以下几点.

  • crontab的主要优点/缺点
  • 使用芹菜比crontab更好的选择
  • Django特定用例:Celery vs crontab运行基于django的周期性任务,当芹菜已被包含在堆栈中时django-celery,就像排队django任务一样.

python django celery django-celery

40
推荐指数
2
解决办法
1万
查看次数

AttributeError:'Flask'对象没有属性'user_options'

我正在尝试从以下doc中设置此基本示例:

http://flask.pocoo.org/docs/patterns/celery/

但到目前为止,我一直得到以下错误:

AttributeError:'Flask'对象没有属性'user_options'

我正在使用芹菜3.1.15.

from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery
Run Code Online (Sandbox Code Playgroud)

例:

from flask import Flask

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


@celery.task()
def add_together(a, b):
    return a + b
Run Code Online (Sandbox Code Playgroud)

回溯错误:

Traceback (most recent call last):
  File "/usr/local/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 30, in …
Run Code Online (Sandbox Code Playgroud)

python celery flask

39
推荐指数
2
解决办法
2万
查看次数