如何检查任务是否在芹菜中运行(具体来说,我使用的是celery-django)?
我已经阅读了文档,而且我用谷歌搜索了,但我看不到像这样的电话:
my_example_task.state() == RUNNING
Run Code Online (Sandbox Code Playgroud)
我的用例是我有一个用于转码的外部(java)服务.当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它.
我正在使用目前的稳定版本 - 2.4,我相信.
我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?
对于这样的任务:
from celery.decorators import task
@task()
def add(x, y):
if not x or not y:
raise Exception("test error")
return self.wait_until_server_responds(
Run Code Online (Sandbox Code Playgroud)
如果它抛出异常并且我想从守护进程端重试它,那么如何应用指数退避算法,即在等待2^2, 2^3,2^4秒之后?
也是从服务器端维护的重试,这样如果工作人员碰巧被杀死,那么产生的下一个工作人员将进行重试任务?
我正在努力让Celery登录工作Django.我有登录设置settings.py进入控制台(这是我正在托管的工作正常Heroku).在每个模块的顶部,我有:
import logging
logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)
在我的tasks.py中,我有:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
Run Code Online (Sandbox Code Playgroud)
这适用于从任务记录调用,我得到如下输出:
2012-11-13T18:05:38+00:00 app[worker.1]: [2012-11-13 18:05:38,527: INFO/PoolWorker-2] Syc feed is starting
Run Code Online (Sandbox Code Playgroud)
但是如果该任务然后调用另一个模块中的queryset方法,例如方法,我会得到重复的日志条目,例如
2012-11-13T18:00:51+00:00 app[worker.1]: [INFO] utils.generic_importers.ftp_processor process(): File xxx.csv already imported. Not downloaded
2012-11-13T18:00:51+00:00 app[worker.1]: [2012-11-13 18:00:51,736: INFO/PoolWorker-6] File xxx.csv already imported. Not downloaded
Run Code Online (Sandbox Code Playgroud)
我想我可以用
CELERY_HIJACK_ROOT_LOGGER = False
Run Code Online (Sandbox Code Playgroud)
只是使用Django日志记录但是当我尝试它时这不起作用,即使我确实让它工作,我也会失去"PoolWorker-6"我想要的那一点.(顺便说一句,我无法弄清楚如何在Celery的日志条目中显示任务名称,因为文档似乎表明它应该).
我怀疑我在这里缺少一些简单的东西.
第一次尝试启动 Celery 但出现如下错误,我已经安装了 redis 并且它的启动正常,但仍然不知何故 django 似乎有问题,
File "<frozen importlib._bootstrap_external>", line 848, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/atif/Documents/celery_test/celery-env/lib/python3.8/site-packages/kombu/transport/redis.py", line 263, in <module>
class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
AttributeError: 'NoneType' object has no attribute 'Redis'
Run Code Online (Sandbox Code Playgroud)
芹菜.py
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')
app = Celery('celery_test',)
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django apps.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Run Code Online (Sandbox Code Playgroud)
设置
#celery stuff ---------------
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json' …Run Code Online (Sandbox Code Playgroud) 我正在为我的django项目使用django-celery.最后一天我更改了我的计算机主机名(我正在使用Ubuntu 12.04,编辑文件'/ etc/hostname'),然后在下次重启后django-celery失败并出现错误
Consumer: Connection Error: [Errno 111] Connection refused. Trying again in 4 seconds...
Run Code Online (Sandbox Code Playgroud)
这个错误经过一番研究,我能找到的,改变我的主机名从导致该错误在这里.我的rabbitmq启动日志显示
file:/ var/log/rabbitmq/startup_log
Activating RabbitMQ plugins ...
********************************************************************************
********************************************************************************
0 plugins activated:
ERROR: epmd error for host "jinesh": nxdomain (non-existing domain)
Run Code Online (Sandbox Code Playgroud)
我的startup_err文件为空.
我跑的时候
root@jinesh:/home/jinesh# rabbitmqctl list_users
Listing users ...
Error: unable to connect to node rabbit@jinesh: nodedown
DIAGNOSTICS
===========
nodes in question: [rabbit@jinesh]
hosts, their running nodes and ports:
- unable to connect to epmd on jinesh: nxdomain
current node details:
- node …Run Code Online (Sandbox Code Playgroud) 我正在使用Celery来管理异步任务.然而,偶尔芹菜过程会失效,导致任何任务都无法执行.我希望能够检查芹菜的状态并确保一切正常,如果我发现任何问题,则向用户显示错误消息.从Celery Worker文档看起来我可能能够使用ping或者检查这个,但ping感觉很乱,并且不清楚究竟如何使用inspect(如果inspect().registered()是空的?).
任何有关这方面的指导将不胜感激.基本上我正在寻找的方法是这样的:
def celery_is_alive():
from celery.task.control import inspect
return bool(inspect().registered()) # is this right??
Run Code Online (Sandbox Code Playgroud)
编辑:它看起来甚至不像celery 2.3.3上的registered()(即使2.1文档列出它).也许ping是正确的答案.
编辑:Ping似乎也没有做我认为会做的事情,所以仍然不确定这里的答案.
我正在尝试使用class的方法作为django-celery任务,使用@task装饰器进行标记.Anand Jeyahar问道,这里也描述了同样的情况.就是这样的
class A:
@task
def foo(self, bar):
...
def main():
a = A()
...
# what i need
a.foo.delay(bar) # executes as celery task
a.foo(bar) # executes locally
Run Code Online (Sandbox Code Playgroud)
问题是,即使我使用这样的类实例a.foo.delay(bar),它foo至少需要两个参数,这些self指针错过了指针.
更多信息:
run()方法,利用一些参数作为方法选择的关键,但它不正是我想要的.self参数传递给方法会改变我执行方法的方式,而不是像celery taks那样,而是像通常的方法一样(即测试时)谢谢你的帮助!
考虑到Celery已经是运行任务队列的堆栈的一部分(即它不仅仅是为了运行crons而添加,这似乎是一种矫枉过正的恕我直言).
它的"周期性任务"功能如何作为crontab的替代品有益?专门寻找以下几点.
django-celery,就像排队django任务一样.默认情况下,Celery将所有任务发送到"celery"队列,但您可以通过添加额外参数来更改此行为:
@task(queue='celery_periodic')
def recalc_last_hour():
log.debug('sending new task')
recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Run Code Online (Sandbox Code Playgroud)
调度程序设置:
CELERYBEAT_SCHEDULE = {
'installer_recalc_hour': {
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15 # every 15 sec for test
},
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)
跑步工人:
python manage.py celery worker -c 1 -Q celery_periodic -B -E
Run Code Online (Sandbox Code Playgroud)
此方案无法按预期工作:此工作人员将定期任务发送到"芹菜"队列,而不是"celery_periodic".我该如何解决这个问题?
PS芹菜== 3.0.16
django-celery ×10
celery ×8
python ×8
django ×5
celery-task ×1
logging ×1
rabbitmq ×1
redis ×1
ubuntu ×1
web-services ×1