标签: django-celery

重试丢失或失败的任务(Celery,Django和RabbitMQ)

有没有办法确定是否有任何任务丢失并重试?


我认为丢失的原因可能是调度程序错误或工作线程崩溃.

我打算重试它们,但我不确定如何确定哪些任务需要退役?

以及如何自动完成此过程?我可以使用自己的自定义调度程序来创建新任务吗?

编辑:我从文档中发现RabbitMQ从未松散任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

rabbitmq celery django-celery

16
推荐指数
1
解决办法
7420
查看次数

重试芹菜失败的任务是链条的一部分

我有一个芹菜链,可以完成一些任务.每个任务都可能失败并重试.请参阅下面的快速示例:

from celery import task

@task(ignore_result=True)
def add(x, y, fail=True):
    try:
        if fail:
            raise Exception('Ugly exception.')
        print '%d + %d = %d' % (x, y, x+y)
    except Exception as e:
        raise add.retry(args=(x, y, False), exc=e, countdown=10)

@task(ignore_result=True)
def mul(x, y):
    print '%d * %d = %d' % (x, y, x*y)
Run Code Online (Sandbox Code Playgroud)

和链:

from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()
Run Code Online (Sandbox Code Playgroud)

运行这两个任务(并假设没有任何失败),你会得到/看到打印:

1 + 2 = 3
3 * 4 = 12
Run Code Online (Sandbox Code Playgroud)

但是,当添加任务第一次失败并在后续重试调用中成功时,链中的其余任务不会运行,即添加任务失败,链中的所有其他任务都不会运行,几秒钟后,添加任务再次运行并成功,链中的其余任务(在本例中为mul.si(3,4))不会运行.

芹菜是否提供了从失败的任务继续失败链的方法?如果没有,那么实现这一目标的最佳方法是什么,并确保链的任务以指定的顺序运行,并且只有在前一个任务成功执行后,即使任务被重试几次?

注1:问题可以解决

add.delay(1, 2).get()
mul.delay(3, 4).get()
Run Code Online (Sandbox Code Playgroud)

但我有兴趣了解为什么链不能用于失败的任务.

celery celery-task django-celery

16
推荐指数
2
解决办法
5236
查看次数

停止主管不会阻止芹菜工人

当我停止Supervisord时,如何确保子流程停止?

我正在使用Supervisord来管理两名芹菜工人.每个工作人员的命令是:

command=/usr/local/myapp/src/manage.py celery worker --concurrency=1 --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

当我开始监督时,它们运行得很好,但是当我跑步时sudo service supervisord stop,它报告我的工人停止:

celery_1: stopped
celery_2: stopped
Run Code Online (Sandbox Code Playgroud)

但是,当我跑步时,ps aux | grep celery我看到它们仍在运行:

www-data 27667  0.0  1.5 318420 64360 ?        S    10:45   0:00 manage.py celery worker --concurrency=1 --loglevel=INFO
www-data 27668  0.0  1.5 318420 318416 ?        S    10:45   0:00 manage.py celery worker --concurrency=1 --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

python supervisord django-celery

16
推荐指数
2
解决办法
8709
查看次数

用于运行异步请求的Python Celery与线程库

我正在运行一个解析大量数据的python方法.由于它是时间密集的,我想在一个单独的线程上异步运行它,以便用户仍然可以访问网站/ UI.

如果用户退出站点或继续在服务器上运行,是否使用"来自线程导入线程"模块的线程终止?

使用Celery与仅仅使用线程模块这样的优势有什么好处?

python celery python-multithreading django-celery

16
推荐指数
1
解决办法
6182
查看次数

Celery - 没有名为 5 的模块

将 celery 和 django-celery 更新到 3.1 后:

$ pip freeze | grep celery
celery==3.1.18
django-celery==3.1.16
Run Code Online (Sandbox Code Playgroud)

启动服务器时遇到此错误:

Traceback (most recent call last):
  File "app/manage.py", line 16, in <module>
    execute_from_command_line(sys.argv)
  [...]
  File "/Users/xxx/.virtualenvs/yyy/lib/python2.7/site-packages/django/conf/__init__.py", line 42, in _setup
    self._wrapped = Settings(settings_module)
  File "/Users/xxx/.virtualenvs/yyy/lib/python2.7/site-packages/django/conf/__init__.py", line 95, in __init__
    raise ImportError("Could not import settings '%s' (Is it on sys.path?): %s" % (self.SETTINGS_MODULE, e))
ImportError: Could not import settings 'settings' (Is it on sys.path?): No module named five
Run Code Online (Sandbox Code Playgroud)

使用:

  • Django 1.4.21
  • 蟒蛇 2.7

python django celery python-2.7 django-celery

16
推荐指数
1
解决办法
2万
查看次数

芹菜任务和自定义装饰

我正在使用django和芹菜(django-celery)开展一个项目.我们的团队决定将所有数据访问代码(app-name)/manager.py包装在内(不要像管理员那样包装django),让(app-name)/task.py中的代码只处理汇编和执行芹菜任务(所以我们没有django此层中的ORM依赖项).

在我manager.py,我有这样的事情:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()
Run Code Online (Sandbox Code Playgroud)

在我的task.py中,我喜欢将它们包装成任务(然后可能使用这些任务来完成更复杂的任务),所以我写这个装饰器:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif …
Run Code Online (Sandbox Code Playgroud)

python django decorator celery django-celery

15
推荐指数
2
解决办法
1万
查看次数

如何在virtualenv中为Celery(django-celery)编写Ubuntu Upstart作业

我真的很喜欢使用暴发户.我目前有很多新手工作,可以在许多版本中运行不同的gunicorn实例.但是,我在互联网上找到的芹菜新贵脚本的2-3个例子对我不起作用.

因此,使用以下变量,如何编写Upstart作业以在virtualenv中运行django-celery.

Django项目之路:

/srv/projects/django_project
Run Code Online (Sandbox Code Playgroud)

这个项目的virtualenv的路径:

/srv/environments/django_project
Run Code Online (Sandbox Code Playgroud)

芹菜设置的路径是Django项目设置文件(django-celery):

/srv/projects/django_project/settings.py
Run Code Online (Sandbox Code Playgroud)

此Celery实例的日志文件的路径:

/srv/logs/celery.log
Run Code Online (Sandbox Code Playgroud)

对于此虚拟环境,用户:

iamtheuser
Run Code Online (Sandbox Code Playgroud)

和小组:

www-data
Run Code Online (Sandbox Code Playgroud)

我想用celerybeat运行Celery Daemon,因此,我要传递给django-admin.py(或manage.py)的命令是:

python manage.py celeryd -B
Run Code Online (Sandbox Code Playgroud)

如果脚本在gunicorn作业开始后启动,那将会更好,并且当gunicorn作业停止时停止.让我们说这个文件是:

/etc/init/gunicorn.conf
Run Code Online (Sandbox Code Playgroud)

django ubuntu upstart celery django-celery

15
推荐指数
1
解决办法
8781
查看次数

RabbitMQ中的celeryev队列变得非常大

我在rabbitmq上使用芹菜.我已经向队列发送了数千条消息,并且它们正在被成功处理,一切正常.但是,几个rabbitmq队列中的消息数量正在增长(队列中有数十万个项目).队列命名celeryev.[...](见下面的截图).这是恰当的行为吗?这些队列的目的是什么,不应该定期清除它们?有没有办法更频繁地清除它们,我认为它们占用了相当多的磁盘空间.

rabbitmq控件的截图

rabbitmq celery django-celery

15
推荐指数
4
解决办法
7335
查看次数

Celery:具有并发性和保留任务的工作程序仅运行1个任务

我的代码中的一些任务执行时间越来越长.

经过检查,我注意到虽然我将我的工作节点设置为并发6,并且存在6个进程以"正常工作",但只有1个任务显示在"正在运行的任务"下.这是一个小小的视觉证据:

以下是工人选项:

工人的选择,如花

这里是该工作人员的任务选项卡,只有一个正在运行的进程:

在此输入图像描述

我发现如果我重新启动celery,并发性再次受到尊重,我会看到> 1运行任务,但经过一段时间/任务后,它会恢复到这种行为.

解决这个间歇性问题的任何想法?

concurrency celery django-celery

15
推荐指数
1
解决办法
5913
查看次数

Django:如何在同一对象中提到的时间自动更改字段的值?

我正在为赛车活动开发一个django项目,其中数据库中的表有三个字段.

1)布尔字段,用于了解种族是否有效

2)比赛开始时间

3)比赛结束时间

在创建对象时,会指定start_time和end_time.如何在比赛开始时将布尔字段的值更改为True,并
在结束时将其更改为False?如何安排这些活动?

python django django-models celery django-celery

14
推荐指数
1
解决办法
6829
查看次数