如何检查任务是否在芹菜中运行(具体来说,我使用的是celery-django)?
我已经阅读了文档,而且我用谷歌搜索了,但我看不到像这样的电话:
my_example_task.state() == RUNNING
Run Code Online (Sandbox Code Playgroud)
我的用例是我有一个用于转码的外部(java)服务.当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它.
我正在使用目前的稳定版本 - 2.4,我相信.
我正在使用Celery(3.0.15)和Redis作为经纪人.
是否有一种直接的方法来查询Celery队列中存在的具有给定名称的任务数量?
并且,作为后续,有没有办法取消Celery队列中存在的给定名称的所有任务?
我已经阅读了" 监控和管理指南",但没有看到解决方案.
我目前正在使用django和芹菜,一切正常.
但是,我希望能够通过检查当前安排的任务数量,让用户有机会在服务器过载时取消任务.
我怎样才能做到这一点?
我使用redis作为经纪人.
我刚刚发现: 在Celery中检索队列中的任务列表
它在某种程度上与我的问题有关,但我不需要列出任务,只计算它们:)
我在django项目中使用Celery,我的经纪人是RabbitMQ,我想检索队列的长度.我查看了Celery的代码,但没有找到工具来做到这一点.我在stackoverflow上发现了这个问题(从客户端检查RabbitMQ队列大小),但我觉得它并不令人满意.
一切都是在芹菜中设置的,所以应该有某种神奇的方法来检索我想要的东西,而不指定通道/连接.
有没有人对这个问题有任何想法?
谢谢 !
我需要允许用户提交非常大的工作请求.我们正在谈论100千兆字节的内存和20小时的计算时间.这花费了我们公司很多钱,因此规定任何时候只能运行2个作业,并且当2个已经运行时请求新作业将被拒绝(并且用户通知服务器正忙).
我当前的解决方案使用来自concurrent.futures的Executor,并且需要将Apache服务器设置为仅运行一个进程,从而降低响应速度(当前用户数非常低,因此现在可以正常使用).
如果可能的话我想使用Celery,但我没有在文档中看到任何方法来完成这个特定的设置.
如何在Django应用程序中在后台运行有限数量的作业,并在作业因服务器繁忙而被拒绝时通知用户?
我正在失去理智,试图找到一种可靠且可测试的方法来获取给定 Celery 队列中包含的任务数量。
我已经阅读了这两个相关的讨论:
注意:我没有使用 Django 或任何其他 Python Web 框架。
但我无法使用这些线程中描述的方法解决我的问题。
我使用 Redis 作为后端,但我希望有一个独立于后端且灵活的解决方案,特别是对于测试。
这是我目前的情况:我定义了一个EnhancedCelery类,它继承Celery并添加了几个方法,特别get_queue_size()是我正在尝试正确实现/测试的方法。
以下是我的测试用例中的代码:
celery_test_app = EnhancedCelery(__name__)
# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')
# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'
# We have to setup queues manually,
# since it seems that auto queue creation doesn't work in …Run Code Online (Sandbox Code Playgroud) 我想使用celery configuration\api创建以下流程:
可能吗?怎么样?
我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量,并在队列备份时帮助向团队发出警报。在我的工作过程中,我遇到了一些我不理解的奇怪行为(并且在 Celery 规范中没有详细记录)。
出于测试目的,我设置了一个端点,该端点将使用 16 个可用于模拟备份队列的长时间运行的任务填充队列。框架是Flask,队列代理是Redis。Celery 配置为每个工作人员可以并行处理最多 4 个任务,而我有 2 个工作人员正在运行。
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
Run Code Online (Sandbox Code Playgroud)
工作.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
Run Code Online (Sandbox Code Playgroud)
以下是我模拟备份生产队列的方法:
/api/debug/create-long-queue来模拟队列中的备份。根据上面的计算,每个工人应该忙着睡觉 1 分钟(总共可以同时处理 8 个任务。每个任务只休眠 30 秒,总共 16 个任务。)handle_incoming_message。这是我看到的使用花来检查队列的内容:
sleepy_job不到新任务的迹象 ,尽管我确信第二个 API 调用已被调用。handle_incoming_messagehandle_incoming_message.delay()sleepy_job任务完成后(约 30 秒),我 …celery ×8
python ×6
django ×3
redis ×3
asynchronous ×1
flower ×1
queue ×1
rabbitmq ×1
web-services ×1