我刚刚开始使用django-celery,我想将celeryd设置为守护进程.但是,这些说明似乎表明它一次只能配置一个站点/项目.芹菜可以处理多个项目,还是只能处理一个项目?而且,如果是这种情况,是否有一种干净的方法来设置celeryd为每个配置自动启动,这需要我为每个配置创建一个单独的init脚本?
我可以做芹菜自动重新加载本身时,有在模块的变化CELERY_IMPORTS中settings.py.
我试图让母模块检测甚至在子模块上的变化,但它没有检测到子模块的变化.这让我明白芹菜不能递归检测.我在文档中搜索了它,但我没有遇到任何对我的问题的回应.
在我的项目中添加与celery相关的所有内容CELERY_IMPORTS以检测更改时,我真的很烦.
有没有办法告诉芹菜"当项目的任何地方有任何变化时自动重新加载".
谢谢!
我的应用程序在页面上收集了一堆电话号码.一旦用户点击提交按钮,我创建一个芹菜任务来调用每个号码并给出一个提醒消息,然后将它们重定向到一个页面,在那里他们可以看到有关该呼叫的实时更新.我正在使用Web套接字来实时更新每个呼叫的状态,并且需要同步执行任务,因为我只能从一个号码拨出.
因此,一旦第一个呼叫/任务完成,我希望下一个呼叫/任务启动.
我看了一下CELERY_ALWAYS_EAGER设置,但它刚刚完成第一次迭代并停止了.
@task
def reminder(number):
# CODE THAT CALLS NUMBER HERE....
def make_calls(request):
for number in phone_numbers:
reminder.delay(number)
return redirect('live_call_updates')
Run Code Online (Sandbox Code Playgroud) 我有一个使用Django的Web应用程序,我正在使用Celery进行一些异步任务处理.
对于Celery,我使用Rabbitmq作为经纪人,而Redis作为结果后端.
Rabbitmq和Redis运行在本地虚拟机上托管的同一个Ubuntu 14.04服务器上.
Celery工作程序在远程计算机上运行(Windows 10)(没有工作程序在Django服务器上运行).
我有三个问题(我认为它们是以某种方式相关的!).
reject requeue = False:[WinError 10061]无法建立连接,因为目标计算机主动拒绝它
我也对我的设置感到困惑,我不知道这个问题可能来自哪里!
所以这是我到目前为止的设置:
# region Celery Settings
CELERY_CONCURRENCY = 1
CELERY_ACCEPT_CONTENT = ['json']
# CELERY_RESULT_BACKEND = 'redis://:C@pV@lue2016@cvc.ma:6379/0'
BROKER_URL = 'amqp://soufiaane:C@pV@lue2016@cvc.ma:5672/cvcHost'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_REDIS_HOST = 'cvc.ma'
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
CELERY_RESULT_BACKEND = 'redis'
CELERY_RESULT_PASSWORD = "C@pV@lue2016"
REDIS_CONNECT_RETRY = True
AMQP_SERVER = "cvc.ma"
AMQP_PORT = 5672
AMQP_USER = "soufiaane"
AMQP_PASSWORD = "C@pV@lue2016"
AMQP_VHOST = …Run Code Online (Sandbox Code Playgroud) 我用:
我可以在Django管理页面中看到我的所有任务,但目前它只有几个状态,例如:
这对我来说还不够.是否可以向管理页面添加有关正在运行的进程的更多详细信息?像进度条或完成的工作柜台等
我知道如何使用Celery日志记录功能,但出于某些原因,GUI在我的情况下更好.
那么,是否可以将一些跟踪信息发送到Django-Celery管理页面?
我有一个Django应用程序,使用Celery来卸载一些任务.主要是,它推迟了数据库表中某些字段的计算.
所以,我有一个tasks.py:
from models import MyModel
from celery import shared_task
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
Run Code Online (Sandbox Code Playgroud)
在models.py中
from django.db import models
from tasks import my_task
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_task.delay(id)
Run Code Online (Sandbox Code Playgroud)
显然,由于循环导入(models导入tasks和tasks导入models),这不起作用.
我暂时通过调用my_task.delay()来解决这个问题views.py,但是将模型逻辑保留在模型类中似乎是有意义的.有没有更好的方法呢?
这是我的设置:
在我的settings.py文件中
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
Run Code Online (Sandbox Code Playgroud)
即我只是使用数据库来排队任务.
现在我的问题是:我有一个用户启动的任务,可能需要几分钟才能完成.我希望每个用户只运行一次任务,我会将任务的结果缓存在一个临时文件中,这样如果用户再次启动任务,我只需返回缓存的文件.我的视图函数中的代码如下所示:
task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)
if result.state == celery.states.PENDING:
# The next line makes a duplicate task if the user rapidly refreshes the page
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
if cached_file_still_exists():
return get_cached_file()
else:
result.forget()
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
Run Code Online (Sandbox Code Playgroud)
这段代码几乎可行.但是当用户快速重新加载页面时,我遇到了问题.在任务排队和最终将任务从队列中拉出并提供给工作人员之间有1-3秒的延迟.在此期间,任务的状态仍为PENDING,这会导致视图逻辑启动重复任务.
我需要的是一些方法来判断任务是否已经提交到队列中,所以我最终不会提交两次.在芹菜中有这样做的标准方法吗?
是否可以在Celery的每个任务级别设置并发性(同时工作者数)?我正在寻找更细粒度的东西CELERYD_CONCURRENCY(设置整个守护进程的并发性).
使用场景是:我有一个celerlyd运行不同类型的任务具有非常不同的性能特征 - 一些是快速的,一些是非常慢的.对于一些人,我想尽可能快地尽可能多地做,对于其他人,我想确保在任何时候只有一个实例运行(即并发为1).
我尝试从命令行启动Celery worker服务器:
celery -A tasks worker --loglevel=info
Run Code Online (Sandbox Code Playgroud)
tasks.py中的代码:
import os
os.environ[ 'DJANGO_SETTINGS_MODULE' ] = "proj.settings"
from celery import task
@task()
def add_photos_task( lad_id ):
...
Run Code Online (Sandbox Code Playgroud)
我收到下一个错误:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 8, in <module>
load_entry_point('celery==3.0.12', 'console_scripts', 'celery')()
File "/usr/local/lib/python2.7/site-packages/celery-3.0.12-py2.7.egg/celery/__main__.py", line 14, in main
main()
File "/usr/local/lib/python2.7/site-packages/celery-3.0.12-py2.7.egg/celery/bin/celery.py", line 946, in main
cmd.execute_from_commandline(argv)
File "/usr/local/lib/python2.7/site-packages/celery-3.0.12-py2.7.egg/celery/bin/celery.py", line 890, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/usr/local/lib/python2.7/site-packages/celery-3.0.12-py2.7.egg/celery/bin/base.py", line 177, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/usr/local/lib/python2.7/site-packages/celery-3.0.12-py2.7.egg/celery/bin/base.py", line 295, in setup_app_from_commandline
self.app = self.find_app(app) …Run Code Online (Sandbox Code Playgroud) 我对celery和django一般都是新手,所以请原谅我缺乏知识.我正在尝试运行测试来进行一些计算并等待测试完成,这样我就能确保完成正确的答案.
这是我有的:
在app/tests.py中
from tasks import *
c = calculate.apply_async(args=[1])
# wait until the task is done
while not calculate.AsyncResult(c.id).status == "SUCCESS":
print c.state
pass
Run Code Online (Sandbox Code Playgroud)
在app/tasks.py中
from celery import shared_task
@shared_task
def calculate(proj_id):
#some calculations followed by a save of the object
Run Code Online (Sandbox Code Playgroud)
即使在芹菜日志中它表示任务已成功完成,状态也永远不会从挂起更改
[2014-06-10 17:55:11,417: INFO/MainProcess] Received task: app.tasks.calculate[1f11e7ab-0add-42df-beac-3d94c6868aac]
[2014-06-10 17:55:11,505: INFO/MainProcess] Task app.tasks.calculate[1f11e7ab-0add-42df-beac-3d94c6868aac] succeeded in 0.0864518239978s: None
Run Code Online (Sandbox Code Playgroud)
我还在mainapp/settings.py中放了CELERY_IGNORE_RESULT = False,但这似乎没有做任何事情.