有没有办法确定是否有任何任务丢失并重试?
我认为丢失的原因可能是调度程序错误或工作线程崩溃.
我打算重试它们,但我不确定如何确定哪些任务需要退役?
以及如何自动完成此过程?我可以使用自己的自定义调度程序来创建新任务吗?
编辑:我从文档中发现RabbitMQ从未松散任务,但是当工作线程在任务执行过程中崩溃时会发生什么?
我有一个芹菜链,可以完成一些任务.每个任务都可能失败并重试.请参阅下面的快速示例:
from celery import task
@task(ignore_result=True)
def add(x, y, fail=True):
try:
if fail:
raise Exception('Ugly exception.')
print '%d + %d = %d' % (x, y, x+y)
except Exception as e:
raise add.retry(args=(x, y, False), exc=e, countdown=10)
@task(ignore_result=True)
def mul(x, y):
print '%d * %d = %d' % (x, y, x*y)
Run Code Online (Sandbox Code Playgroud)
和链:
from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()
Run Code Online (Sandbox Code Playgroud)
运行这两个任务(并假设没有任何失败),你会得到/看到打印:
1 + 2 = 3
3 * 4 = 12
Run Code Online (Sandbox Code Playgroud)
但是,当添加任务第一次失败并在后续重试调用中成功时,链中的其余任务不会运行,即添加任务失败,链中的所有其他任务都不会运行,几秒钟后,添加任务再次运行并成功,链中的其余任务(在本例中为mul.si(3,4))不会运行.
芹菜是否提供了从失败的任务继续失败链的方法?如果没有,那么实现这一目标的最佳方法是什么,并确保链的任务以指定的顺序运行,并且只有在前一个任务成功执行后,即使任务被重试几次?
注1:问题可以解决
add.delay(1, 2).get()
mul.delay(3, 4).get()
Run Code Online (Sandbox Code Playgroud)
但我有兴趣了解为什么链不能用于失败的任务.
当我停止Supervisord时,如何确保子流程停止?
我正在使用Supervisord来管理两名芹菜工人.每个工作人员的命令是:
command=/usr/local/myapp/src/manage.py celery worker --concurrency=1 --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)
当我开始监督时,它们运行得很好,但是当我跑步时sudo service supervisord stop,它报告我的工人停止:
celery_1: stopped
celery_2: stopped
Run Code Online (Sandbox Code Playgroud)
但是,当我跑步时,ps aux | grep celery我看到它们仍在运行:
www-data 27667 0.0 1.5 318420 64360 ? S 10:45 0:00 manage.py celery worker --concurrency=1 --loglevel=INFO
www-data 27668 0.0 1.5 318420 318416 ? S 10:45 0:00 manage.py celery worker --concurrency=1 --loglevel=INFO
Run Code Online (Sandbox Code Playgroud) 我正在运行一个解析大量数据的python方法.由于它是时间密集的,我想在一个单独的线程上异步运行它,以便用户仍然可以访问网站/ UI.
如果用户退出站点或继续在服务器上运行,是否使用"来自线程导入线程"模块的线程终止?
使用Celery与仅仅使用线程模块这样的优势有什么好处?
将 celery 和 django-celery 更新到 3.1 后:
$ pip freeze | grep celery
celery==3.1.18
django-celery==3.1.16
Run Code Online (Sandbox Code Playgroud)
启动服务器时遇到此错误:
Traceback (most recent call last):
File "app/manage.py", line 16, in <module>
execute_from_command_line(sys.argv)
[...]
File "/Users/xxx/.virtualenvs/yyy/lib/python2.7/site-packages/django/conf/__init__.py", line 42, in _setup
self._wrapped = Settings(settings_module)
File "/Users/xxx/.virtualenvs/yyy/lib/python2.7/site-packages/django/conf/__init__.py", line 95, in __init__
raise ImportError("Could not import settings '%s' (Is it on sys.path?): %s" % (self.SETTINGS_MODULE, e))
ImportError: Could not import settings 'settings' (Is it on sys.path?): No module named five
Run Code Online (Sandbox Code Playgroud)
使用:
我正在使用django和芹菜(django-celery)开展一个项目.我们的团队决定将所有数据访问代码(app-name)/manager.py包装在内(不要像管理员那样包装django),让(app-name)/task.py中的代码只处理汇编和执行芹菜任务(所以我们没有django此层中的ORM依赖项).
在我manager.py,我有这样的事情:
def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(Photo)
try:
tag = Tag.objects.get(name=tag_name)
except ObjectDoesNotExist:
return Tag.objects.none()
return tag
def get_tagged_photos(tag):
ctype = ContentType.objects.get_for_model(Photo)
return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)
def get_tagged_photos_count(tag):
return get_tagged_photos(tag).count()
Run Code Online (Sandbox Code Playgroud)
在我的task.py中,我喜欢将它们包装成任务(然后可能使用这些任务来完成更复杂的任务),所以我写这个装饰器:
import manager #the module within same app containing data access functions
class mfunc_to_task(object):
def __init__(mfunc_type='get'):
self.mfunc_type = mfunc_type
def __call__(self, f):
def wrapper_f(*args, **kwargs):
callback = kwargs.pop('callback', None)
mfunc = getattr(manager, f.__name__)
result = mfunc(*args, **kwargs)
if callback:
if self.mfunc_type == 'get':
subtask(callback).delay(result)
elif …Run Code Online (Sandbox Code Playgroud) 我真的很喜欢使用暴发户.我目前有很多新手工作,可以在许多版本中运行不同的gunicorn实例.但是,我在互联网上找到的芹菜新贵脚本的2-3个例子对我不起作用.
因此,使用以下变量,如何编写Upstart作业以在virtualenv中运行django-celery.
Django项目之路:
/srv/projects/django_project
Run Code Online (Sandbox Code Playgroud)
这个项目的virtualenv的路径:
/srv/environments/django_project
Run Code Online (Sandbox Code Playgroud)
芹菜设置的路径是Django项目设置文件(django-celery):
/srv/projects/django_project/settings.py
Run Code Online (Sandbox Code Playgroud)
此Celery实例的日志文件的路径:
/srv/logs/celery.log
Run Code Online (Sandbox Code Playgroud)
对于此虚拟环境,用户:
iamtheuser
Run Code Online (Sandbox Code Playgroud)
和小组:
www-data
Run Code Online (Sandbox Code Playgroud)
我想用celerybeat运行Celery Daemon,因此,我要传递给django-admin.py(或manage.py)的命令是:
python manage.py celeryd -B
Run Code Online (Sandbox Code Playgroud)
如果脚本在gunicorn作业开始后启动,那将会更好,并且当gunicorn作业停止时停止.让我们说这个文件是:
/etc/init/gunicorn.conf
Run Code Online (Sandbox Code Playgroud) 我在rabbitmq上使用芹菜.我已经向队列发送了数千条消息,并且它们正在被成功处理,一切正常.但是,几个rabbitmq队列中的消息数量正在增长(队列中有数十万个项目).队列命名celeryev.[...](见下面的截图).这是恰当的行为吗?这些队列的目的是什么,不应该定期清除它们?有没有办法更频繁地清除它们,我认为它们占用了相当多的磁盘空间.

我的代码中的一些任务执行时间越来越长.
经过检查,我注意到虽然我将我的工作节点设置为并发6,并且存在6个进程以"正常工作",但只有1个任务显示在"正在运行的任务"下.这是一个小小的视觉证据:
以下是工人选项:

这里是该工作人员的任务选项卡,只有一个正在运行的进程:

我发现如果我重新启动celery,并发性再次受到尊重,我会看到> 1运行任务,但经过一段时间/任务后,它会恢复到这种行为.
解决这个间歇性问题的任何想法?
我正在为赛车活动开发一个django项目,其中数据库中的表有三个字段.
1)布尔字段,用于了解种族是否有效
2)比赛开始时间
3)比赛结束时间
在创建对象时,会指定start_time和end_time.如何在比赛开始时将布尔字段的值更改为True,并
在结束时将其更改为False?如何安排这些活动?
django-celery ×10
celery ×9
python ×5
django ×4
rabbitmq ×2
celery-task ×1
concurrency ×1
decorator ×1
python-2.7 ×1
supervisord ×1
ubuntu ×1
upstart ×1