我正在使用Django和Celery + RabbitMQ来创建用户上传的视频的视频转换任务.现在我知道如何查询芹菜以获得状态.我的问题是在哪里保存与每个任务相关的task_id,我应该将它保存在模型中还是在django的缓存中?
我知道过去曾经问过类似的问题,例如这个stackoverflow问题,但还没有确定的答案.我知道这个问题的答案因开发人员的偏好而异,但如果有人能够教育不同方法的优缺点,那就太棒了.
为了重新迭代,我将使用task_id来经常从前端触发AJAX查询,以了解视频转换是否已完成.
我正在使用Celery 3.0并具有如下配置文件.
celeryconfig.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = ("tasks", )
CELERY_TASK_RESULT_EXPIRES = 300
Run Code Online (Sandbox Code Playgroud)
tasks.py
import celery
@celery.task
def function(x,y):
return x + y
Run Code Online (Sandbox Code Playgroud)
和function.py
from tasks import function
print function.delay(4,4).get()
Run Code Online (Sandbox Code Playgroud)
我使用以下命令运行应用程序
celeryd --loglevel=INFO --config=celeryconfig
Run Code Online (Sandbox Code Playgroud)
一切都很好,直到现在.我有redis和芹菜跑步并得到答案.
但是当我从另一个名为parallelizer的文件中运行function命令时,
我收到套接字错误,
socket.error: [Errno 61] Connection refused
Run Code Online (Sandbox Code Playgroud)
我的文件如下,
from examples.dummy.tasks import function
print function.delay(4,4).get()
Run Code Online (Sandbox Code Playgroud)
有任何想法吗 ?
我们有8个核心,16 GB内存,运行celery的Linux服务器,它运行一个celery工作队列myQueue,并在gevent池下以1000并发运行。
执行任务大约 1 小时后,worker 突然冻结,它不接受新任务,这里celery beat
是我们对 celery 的配置
App = Celery('tasks')
class Conf:
BROKER_URL = 'amqp://<user>:<pass>@<host>:<port>/<vhost>'
CELERY_IGNORE_RESULT = True
CELERY_IMPORTS = ("worker_class",)
CELERYBEAT_SCHEDULE = {
'RunTask':{
'task': 'tasks.worker.MyWorker',
'schedule' : timedelta(minutes=5)
}
}
Run Code Online (Sandbox Code Playgroud)
App.config_from_object(会议)
我们正在运行芹菜,如下所示
celery worker --workdir=tasks/ -A worker -P gevent -c 1000 -Q myQueue --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)
还有人可以解释一下我如何使用 gevent 池吗celery multi
我正在尝试使用sqlalchemy + celery beats为我的数据库中的特定类型的对象翻转一个布尔标志.但是如何从tasks.py文件中访问我的orm?
from models import Book
from celery.decorators import periodic_task
from application import create_celery_app
celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/
# This task works fine
@celery.task
def celery_send_email(to,subject,template):
with current_app.app_context():
msg = Message(
subject,
recipients=[to],
html=template,
sender=current_app.config['MAIL_DEFAULT_SENDER']
)
return mail.send(msg)
#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
with current_app.app_context(): <<< #Fails on this line
books = Book.query.all() <<<< #Fails here too
for book in books:
book.read = True
book.save()
Run Code Online (Sandbox Code Playgroud)
我正在使用芹菜beat命令运行:
celery -A task worker -l INFO --beat
但是我收到以下错误:
raise …Run Code Online (Sandbox Code Playgroud) 我在使用芹菜时遇到了很多奇怪的事情。比如,我更新tasks.py,supervisorctl reload(重启),但是tasks错了。有些任务似乎消失了等等。
今天我发现,因为supervisorctl stop all不能阻止所有的芹菜工人。并且只有 kill -9 'pgrep python' 可以将它们全部杀死。
情况:
root@ubuntu12:/data/www/article_fetcher# supervisorctl
celery_beat RUNNING pid 29597, uptime 0:52:18
celery_worker1 RUNNING pid 29556, uptime 0:52:20
celery_worker2 RUNNING pid 29570, uptime 0:52:19
celery_worker3 RUNNING pid 29557, uptime 0:52:20
celery_worker4 RUNNING pid 29586, uptime 0:52:18
uwsgi RUNNING pid 29604, uptime 0:52:18
supervisor> stop all
celery_beat: stopped
celery_worker2: stopped
celery_worker4: stopped
celery_worker3: stopped
uwsgi: stopped
celery_worker1: stopped
supervisor> status
celery_beat STOPPED Aug 04 11:05 AM
celery_worker1 STOPPED Aug 04 …Run Code Online (Sandbox Code Playgroud) python celery-task supervisord django-celery django-supervisor
我最近安装了 flower 0.8.4,我在 Digital Ocean 中创建了一个快照,然后使用这个快照启动了一个新服务器。现在花还在显示老工人的名字。如何在保留现有工作人员信息的同时删除此工作人员。
芹菜开始:
worker -n celeryd@%h -f /usr/local/src/imbue/application/imbue/log/celeryd.log --loglevel=DEBUG --autoscale=50,10
Run Code Online (Sandbox Code Playgroud)
花:
/usr/bin/python /usr/local/bin/flower --basic_auth=root:password --port=8082 --broker=amqp://user:password@rabbitmq:5672// --persistent=True --db=/usr/local/src/imbue/application/imbue/log/flower.db
Run Code Online (Sandbox Code Playgroud)
我有3个远程工作者,每个人都使用默认池(prefork)和单个任务运行.
单个任务需要2到5分钟才能完成,因为它在许多不同的工具上运行并在ELK中插入数据库.
worker命令: celery -A project worker -l info
我应该使用哪个池类来加快处理速度?
有没有其他方法来提高性能?
我有一些 Celery 任务,我通过使用任务 ID 实例化 AsyncResult 来定期从单独的进程检查它们的状态。
由此,我得到了任务状态。通过实现自定义结果后端和扩展 AsyncResult 类,如here所述,我也可以获取任务名称。但是,我希望能够为每个任务获取自定义显示名称 - 人类可读的内容,以便我可以以用户友好的方式显示状态信息。
假设,它可能设置为:
@app.task()
def my_task(args):
display_name = "My Task"
...
...
Run Code Online (Sandbox Code Playgroud)
后来我会做...
result = ExtendedAsyncResult(task_id)
result.display_name
Run Code Online (Sandbox Code Playgroud)
但是从我链接到的自定义结果后端来看,似乎没有任何方法可以访问任务的局部变量。
有没有办法实现我正在寻找的东西?
我有一个定期的数据清除任务,每天运行一次。
我希望该任务只运行 1 小时。如果处理该任务的持续时间超过 1 小时,则使该任务过期。第二天将再次运行 1 小时。
我想这样做是因为如果流量很高,那么该特定的 cere 任务会持续运行 10-15 小时。
请帮我解决这个问题 我在跑步时遇到了这个问题
celery -A app.celery worker --loglevel=info
Run Code Online (Sandbox Code Playgroud)
错误:
Unable to load celery application.
The module app.celery was not found.
Run Code Online (Sandbox Code Playgroud)
我的代码是——
# Celery Configuration
from celery import Celery
from app import app
print("App Name=",app.import_name)
celery=Celery(app.name,broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def download_content():
return "hello"
Run Code Online (Sandbox Code Playgroud)
目录结构-- newYoutube/app/auth/routes.py 并且此功能存在于 routes.py 中 auth is blueprint。
celery-task ×10
celery ×9
python ×7
celerybeat ×2
django ×2
flask ×2
flower ×1
gevent ×1
python-3.x ×1
state ×1
supervisord ×1