标签: celery-task

从前端使用AJAX查询Celery以了解创建的任务是否已完成的最佳方法?

我正在使用Django和Celery + RabbitMQ来创建用户上传的视频的视频转换任务.现在我知道如何查询芹菜以获得状态.我的问题是在哪里保存与每个任务相关的task_id,我应该将它保存在模型中还是在django的缓存中?

我知道过去曾经问过类似的问题,例如这个stackoverflow问题,但还没有确定的答案.我知道这个问题的答案因开发人员的偏好而异,但如果有人能够教育不同方法的优缺点,那就太棒了.

为了重新迭代,我将使用task_id来经常从前端触发AJAX查询,以了解视频转换是否已完成.

django state celery celery-task

4
推荐指数
1
解决办法
3184
查看次数

Python Celery socket.error:[Errno 61]连接被拒绝

我正在使用Celery 3.0并具有如下配置文件.

celeryconfig.py

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERY_IMPORTS = ("tasks", )
CELERY_TASK_RESULT_EXPIRES = 300
Run Code Online (Sandbox Code Playgroud)

tasks.py

import celery

@celery.task
def function(x,y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

和function.py

from tasks import function

print function.delay(4,4).get()
Run Code Online (Sandbox Code Playgroud)

我使用以下命令运行应用程序

celeryd --loglevel=INFO --config=celeryconfig
Run Code Online (Sandbox Code Playgroud)

一切都很好,直到现在.我有redis和芹菜跑步并得到答案.

但是当我从另一个名为parallelizer的文件中运行function命令时,

我收到套接字错误,

 socket.error: [Errno 61] Connection refused
Run Code Online (Sandbox Code Playgroud)

我的文件如下,

from examples.dummy.tasks import function
print function.delay(4,4).get()
Run Code Online (Sandbox Code Playgroud)

有任何想法吗 ?

python celery celery-task

4
推荐指数
2
解决办法
5933
查看次数

Celery + Gevent 池在执行 1000 多个任务后挂起

我们有8个核心,16 GB内存,运行celery的Linux服务器,它运行一个celery工作队列myQueue,并在gevent池下以1000并发运行。

执行任务大约 1 小时后,worker 突然冻结,它不接受新任务,这里celery beat 是我们对 celery 的配置

App =  Celery('tasks')
class Conf:
   BROKER_URL   = 'amqp://<user>:<pass>@<host>:<port>/<vhost>'
   CELERY_IGNORE_RESULT = True
   CELERY_IMPORTS = ("worker_class",)
   CELERYBEAT_SCHEDULE = {
       'RunTask':{
           'task': 'tasks.worker.MyWorker',
           'schedule' : timedelta(minutes=5)
       }
   }
Run Code Online (Sandbox Code Playgroud)

App.config_from_object(会议)

我们正在运行芹菜,如下所示

celery worker --workdir=tasks/ -A worker -P gevent -c 1000 -Q myQueue --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

还有人可以解释一下我如何使用 gevent 池吗celery multi

python celery gevent celery-task

4
推荐指数
1
解决办法
3560
查看次数

如何使用芹菜任务访问orm?

我正在尝试使用sqlalchemy + celery beats为我的数据库中的特定类型的对象翻转一个布尔标志.但是如何从tasks.py文件中访问我的orm?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()
Run Code Online (Sandbox Code Playgroud)

我正在使用芹菜beat命令运行:

celery -A task worker -l INFO --beat

但是我收到以下错误:

raise …
Run Code Online (Sandbox Code Playgroud)

celery flask celery-task flask-sqlalchemy celerybeat

4
推荐指数
1
解决办法
1753
查看次数

使主管正确停止芹菜工人

我在使用芹菜时遇到了很多奇怪的事情。比如,我更新tasks.py,supervisorctl reload(重启),但是tasks错了。有些任务似乎消失了等等。
今天我发现,因为supervisorctl stop all不能阻止所有的芹菜工人。并且只有 kill -9 'pgrep python' 可以将它们全部杀死。

情况:

    root@ubuntu12:/data/www/article_fetcher# supervisorctl
    celery_beat                      RUNNING    pid 29597, uptime 0:52:18
    celery_worker1                   RUNNING    pid 29556, uptime 0:52:20
    celery_worker2                   RUNNING    pid 29570, uptime 0:52:19
    celery_worker3                   RUNNING    pid 29557, uptime 0:52:20
    celery_worker4                   RUNNING    pid 29586, uptime 0:52:18
    uwsgi                            RUNNING    pid 29604, uptime 0:52:18
    supervisor> stop all
    celery_beat: stopped
    celery_worker2: stopped
    celery_worker4: stopped
    celery_worker3: stopped
    uwsgi: stopped
    celery_worker1: stopped
    supervisor> status
    celery_beat                      STOPPED    Aug 04 11:05 AM
    celery_worker1                   STOPPED    Aug 04 …
Run Code Online (Sandbox Code Playgroud)

python celery-task supervisord django-celery django-supervisor

4
推荐指数
1
解决办法
4004
查看次数

芹菜花仪表板移除工人

我最近安装了 flower 0.8.4,我在 Digital Ocean 中创建了一个快照,然后使用这个快照启动了一个新服务器。现在花还在显示老工人的名字。如何在保留现有工作人员信息的同时删除此工作人员。

芹菜开始:

worker -n celeryd@%h -f /usr/local/src/imbue/application/imbue/log/celeryd.log --loglevel=DEBUG --autoscale=50,10
Run Code Online (Sandbox Code Playgroud)

花:

/usr/bin/python /usr/local/bin/flower --basic_auth=root:password --port=8082 --broker=amqp://user:password@rabbitmq:5672// --persistent=True --db=/usr/local/src/imbue/application/imbue/log/flower.db
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

python celery celery-task flower

4
推荐指数
2
解决办法
3480
查看次数

哪个池类应该在芹菜中使用prefork,eventlet或gevent?

我有3个远程工作者,每个人都使用默认池(prefork)和单个任务运行.

单个任务需要2到5分钟才能完成,因为它在许多不同的工具上运行并在ELK中插入数据库.

worker命令: celery -A project worker -l info

我应该使用哪个池类来加快处理速度?

有没有其他方法来提高性能?

python celery celery-task

4
推荐指数
1
解决办法
2348
查看次数

Celery 任务显示名称

我有一些 Celery 任务,我通过使用任务 ID 实例化 AsyncResult 来定期从单独的进程检查它们的状态。

由此,我得到了任务状态。通过实现自定义结果后端和扩展 AsyncResult 类,如here所述,我也可以获取任务名称。但是,我希望能够为每个任务获取自定义显示名称 - 人类可读的内容,以便我可以以用户友好的方式显示状态信息。

假设,它可能设置为:

@app.task()
def my_task(args):
    display_name = "My Task"
    ...
    ...
Run Code Online (Sandbox Code Playgroud)

后来我会做...

result = ExtendedAsyncResult(task_id)
result.display_name
Run Code Online (Sandbox Code Playgroud)

但是从我链接到的自定义结果后端来看,似乎没有任何方法可以访问任务的局部变量。

有没有办法实现我正在寻找的东西?

celery celery-task

4
推荐指数
1
解决办法
3925
查看次数

运行 celery 周期性任务 1 小时,需要超过 1 小时,该任务过期吗?

我有一个定期的数据清除任务,每天运行一次。

我希望该任务只运行 1 小时。如果处理该任务的持续时间超过 1 小时,则使该任务过期。第二天将再次运行 1 小时。

我想这样做是因为如果流量很高,那么该特定的 cere 任务会持续运行 10-15 小时。

python django celery celery-task celerybeat

4
推荐指数
1
解决办法
3930
查看次数

Flask 错误:无法加载 celery 应用程序

请帮我解决这个问题 我在跑步时遇到了这个问题

celery -A app.celery worker --loglevel=info
Run Code Online (Sandbox Code Playgroud)

错误:

Unable to load celery application.
The module app.celery was not found.
Run Code Online (Sandbox Code Playgroud)

我的代码是——

#  Celery Configuration
from celery import Celery
from app import app
print("App Name=",app.import_name)

celery=Celery(app.name,broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def download_content():
    return "hello"
Run Code Online (Sandbox Code Playgroud)

目录结构-- newYoutube/app/auth/routes.py 并且此功能存在于 routes.py 中 auth is blueprint。

python celery flask celery-task python-3.x

4
推荐指数
1
解决办法
5585
查看次数