标签: django-celery

如何在Celery中检查任务状态?

如何检查任务是否在芹菜中运行(具体来说,我使用的是celery-django)?

我已经阅读了文档,而且我用谷歌搜索了,但我看不到像这样的电话:

my_example_task.state() == RUNNING
Run Code Online (Sandbox Code Playgroud)

我的用例是我有一个用于转码的外部(java)服务.当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它.

我正在使用目前的稳定版本 - 2.4,我相信.

python web-services celery django-celery

85
推荐指数
7
解决办法
9万
查看次数

如何手动从shell运行芹菜定期任务?

我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?

python django celery celery-task django-celery

68
推荐指数
3
解决办法
3万
查看次数

以指数退避重试Celery任务

对于这样的任务:

from celery.decorators import task

@task()
def add(x, y):
    if not x or not y:
        raise Exception("test error")
    return self.wait_until_server_responds(
Run Code Online (Sandbox Code Playgroud)

如果它抛出异常并且我想从守护进程端重试它,那么如何应用指数退避算法,即在等待2^2, 2^3,2^4秒之后?

也是从服务器端维护的重试,这样如果工作人员碰巧被杀死,那么产生的下一个工作人员将进行重试任务?

python celery django-celery

65
推荐指数
2
解决办法
2万
查看次数

Django Celery采伐最佳实践

我正在努力让Celery登录工作Django.我有登录设置settings.py进入控制台(这是我正在托管的工作正常Heroku).在每个模块的顶部,我有:

import logging
logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)

在我的tasks.py中,我有:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
Run Code Online (Sandbox Code Playgroud)

这适用于从任务记录调用,我得到如下输出:

2012-11-13T18:05:38+00:00 app[worker.1]: [2012-11-13 18:05:38,527: INFO/PoolWorker-2] Syc feed is starting
Run Code Online (Sandbox Code Playgroud)

但是如果该任务然后调用另一个模块中的queryset方法,例如方法,我会得到重复的日志条目,例如

2012-11-13T18:00:51+00:00 app[worker.1]: [INFO] utils.generic_importers.ftp_processor process(): File xxx.csv already imported. Not downloaded
2012-11-13T18:00:51+00:00 app[worker.1]: [2012-11-13 18:00:51,736: INFO/PoolWorker-6] File xxx.csv already imported. Not downloaded
Run Code Online (Sandbox Code Playgroud)

我想我可以用

CELERY_HIJACK_ROOT_LOGGER = False
Run Code Online (Sandbox Code Playgroud)

只是使用Django日志记录但是当我尝试它时这不起作用,即使我确实让它工作,我也会失去"PoolWorker-6"我想要的那一点.(顺便说一句,我无法弄清楚如何在Celery的日志条目中显示任务名称,因为文档似乎表明它应该).

我怀疑我在这里缺少一些简单的东西.

python django logging celery django-celery

64
推荐指数
4
解决办法
4万
查看次数

celery无法使用redis

第一次尝试启动 Celery 但出现如下错误,我已经安装了 redis 并且它的启动正常,但仍然不知何故 django 似乎有问题,

File "<frozen importlib._bootstrap_external>", line 848, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/atif/Documents/celery_test/celery-env/lib/python3.8/site-packages/kombu/transport/redis.py", line 263, in <module>
    class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
AttributeError: 'NoneType' object has no attribute 'Redis'
Run Code Online (Sandbox Code Playgroud)

芹菜.py

from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')



    app = Celery('celery_test',)
    
    app.config_from_object('django.conf:settings')
    
    # Load task modules from all registered Django apps.
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
Run Code Online (Sandbox Code Playgroud)

设置

#celery stuff ---------------
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json' …
Run Code Online (Sandbox Code Playgroud)

django message-queue redis celery django-celery

59
推荐指数
2
解决办法
3万
查看次数

第一次更改主机名后,rabbitmq-server无法启动

我正在为我的django项目使用django-celery.最后一天我更改了我的计算机主机名(我正在使用Ubuntu 12.04,编辑文件'/ etc/hostname'),然后在下次重启后django-celery失败并出现错误

Consumer: Connection Error: [Errno 111] Connection refused. Trying again in 4 seconds...
Run Code Online (Sandbox Code Playgroud)

这个错误经过一番研究,我能找到的,改变我的主机名从导致该错误在这里.我的rabbitmq启动日志显示

file:/ var/log/rabbitmq/startup_log

Activating RabbitMQ plugins ...

********************************************************************************
********************************************************************************

0 plugins activated:

ERROR: epmd error for host "jinesh": nxdomain (non-existing domain)
Run Code Online (Sandbox Code Playgroud)

我的startup_err文件为空.

我跑的时候

root@jinesh:/home/jinesh# rabbitmqctl list_users
Listing users ...
Error: unable to connect to node rabbit@jinesh: nodedown

DIAGNOSTICS
===========

nodes in question: [rabbit@jinesh]

hosts, their running nodes and ports:
- unable to connect to epmd on jinesh: nxdomain

current node details:
- node …
Run Code Online (Sandbox Code Playgroud)

ubuntu rabbitmq django-celery

56
推荐指数
5
解决办法
6万
查看次数

检测Celery是否可用/正在运行

我正在使用Celery来管理异步任务.然而,偶尔芹菜过程会失效,导致任何任务都无法执行.我希望能够检查芹菜的状态并确保一切正常,如果我发现任何问题,则向用户显示错误消息.从Celery Worker文档看起来我可能能够使用ping或者检查这个,但ping感觉很乱,并且不清楚究竟如何使用inspect(如果inspect().registered()是空的?).

任何有关这方面的指导将不胜感激.基本上我正在寻找的方法是这样的:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??
Run Code Online (Sandbox Code Playgroud)

编辑:它看起来甚至不像celery 2.3.3上的registered()(即使2.1文档列出它).也许ping是正确的答案.

编辑:Ping似乎也没有做我认为会做的事情,所以仍然不确定这里的答案.

python django celery django-celery

48
推荐指数
5
解决办法
3万
查看次数

使用类方法作为芹菜任务

我正在尝试使用class的方法作为django-celery任务,使用@task装饰器进行标记.Anand Jeyahar问道,这里也描述同样的情况.就是这样的

class A:
    @task
    def foo(self, bar):
        ...

def main():
    a = A()
    ...
    # what i need
    a.foo.delay(bar) # executes as celery task 
    a.foo(bar) # executes locally
Run Code Online (Sandbox Code Playgroud)

问题是,即使我使用这样的类实例a.foo.delay(bar),它foo至少需要两个参数,这些self指针错过了指针.

更多信息:

  • 由于继承,我无法将类转换为模块
  • 方法强烈依赖于类成员,所以我不能使它们成为静态的
  • 标记与@task装饰任务,使一类任务本身,它可能是可以从执行该方法的run()方法,利用一些参数作为方法选择的关键,但它不正是我想要的.
  • 创建类的实例并将其作为self参数传递给方法会改变我执行方法的方式,而不是像celery taks那样,而是像通常的方法一样(即测试时)
  • 我试图找出如何在构造函数中以dinamically方式注册任务,但芹菜在工作者之间共享代码,这就是为什么它似乎是不可能的.

谢谢你的帮助!

python django-celery

42
推荐指数
3
解决办法
3万
查看次数

为什么使用Celery运行计划任务优于crontab?

考虑到Celery已经是运行任务队列的堆栈的一部分(即它不仅仅是为了运行crons而添加,这似乎是一种矫枉过正的恕我直言).

它的"周期性任务"功能如何作为crontab的替代品有益?专门寻找以下几点.

  • crontab的主要优点/缺点
  • 使用芹菜比crontab更好的选择
  • Django特定用例:Celery vs crontab运行基于django的周期性任务,当芹菜已被包含在堆栈中时django-celery,就像排队django任务一样.

python django celery django-celery

40
推荐指数
2
解决办法
1万
查看次数

如何将周期性任务发送到Celery中的特定队列

默认情况下,Celery将所有任务发送到"celery"队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Run Code Online (Sandbox Code Playgroud)

调度程序设置:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)

跑步工人:

python manage.py celery worker -c 1 -Q celery_periodic -B -E
Run Code Online (Sandbox Code Playgroud)

此方案无法按预期工作:此工作人员将定期任务发送到"芹菜"队列,而不是"celery_periodic".我该如何解决这个问题?

PS芹菜== 3.0.16

python celery django-celery

32
推荐指数
2
解决办法
1万
查看次数