我对 celery 的记录器有问题。我有一个渲染帧的函数。我记录了我生成的子进程的输出,但似乎只有每个工作人员从队列中挑选的第一个作业被写入。队列中的所有后续任务都不会生成日志文件。我也尝试过使用 python 自己的日志记录,但发生了同样的问题。是否有我可能缺少的配置?
@task(queue='rndr')
def rndr(params):
path = get_logger_path(params)
logger = rndr.get_logger(logfile=path)
return render(params, logger)
Run Code Online (Sandbox Code Playgroud)
我以这种方式定义我的任务,因为我的重试记录器的定义不同,即 rndr_retry.get_logger ...
我的 celeryconfig 如下所示:
BROKER_HOST = "xxx.xxx.xxx.xxx"
BROKER_PORT = 5672
BROKER_USER = "xxxx"
BROKER_PASSWORD = "xxxx"
CELERY_RESULT_BACKEND = 'amqp'
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ACKS_LATE = True
CELERY_IMPORTS = ['lib.tasks.concatenate', 'lib.tasks.encode', 'lib.tasks.render', 'lib.tasks.still_image', 'lib.tasks.retry']
CELERY_ROUTES = {'lib.tasks.encode':{'queue': 'encode'},
'lib.tasks.concatenate':{'queue': 'encode'},
'lib.tasks.still_image':{'queue': 'encode'},
'lib.tasks.render':{'queue':'rndr'},
'lib.tasks.retry':{'queue': 'retry'}
}
Run Code Online (Sandbox Code Playgroud)
希望有人能解释为什么只有队列中的第一个任务会写...
先感谢您。
更新:根据要求,这里是渲染方法的部分版本,没有所有细节......
def render(params, logger):
#load params to local values
try:
#create subprocess
output = child_proc.communicate()[0] …Run Code Online (Sandbox Code Playgroud) 我不知道从哪里开始诊断和解决这个问题:
$ bin/django celeryd -l DEBUG -v 3
-------------- celery@lucid32 v3.0.3 (Chiastic Slide)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: django://localhost//
- ** ---------- . app: default:0x8b0aa4c (djcelery.loaders.DjangoLoader)
- ** ---------- . concurrency: 1 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[Tasks]
. celery.backend_cleanup
. celery.chain …Run Code Online (Sandbox Code Playgroud) 我想知道在将 celery 任务部署到工作人员后,有哪些选项可用于从浏览器监控 celery 任务?
我当前的应用程序堆栈是一个在扭曲内部运行的烧瓶应用程序,使用 celery 运行数十到数千个小型后台任务(更新存储库中的元数据,创建图像衍生品等)我设想使用 ajax 长轮询来监视状态由用户启动的 celery 任务。我将 redis 用于后端代理和结果。
我看到 celery 有一些命令行方法来监控任务,或者有一个 web 仪表板的花。但是,如果我想从发送到 celery 的特定任务中查看更详细的状态,那么该任务打印/写入日志文件,然后长轮询该文件以获取来自flask前端的更改是否更有意义?
此时用户可以说“更新这10,000个项目”,任务被发送到celery,前端很快就说,“作业已发送!”。而且任务确实完成了。但是我想让用户导航到“/status”并查看那 10,000 个小作业的状态 - 即使是滚动日志文件也可能会起作用。
任何建议将不胜感激。花了很多挠头,使这一步勾画出来的东西,但我纺纱我的车轮正好搞清楚什么从用户前端长查询。
我在 Celery 中有一个速率限制任务,但 Flower 中显示的结果使它看起来好像它正在立即处理一堆任务,然后遵守速率限制。为什么会发生这种情况?我需要做些什么来确保它从一开始就遵守速率限制吗?
@shared_task(rate_limit="4/m")
def my_task(a,b):
...
Run Code Online (Sandbox Code Playgroud)
...
my_task.apply_async((x, y,),)
Run Code Online (Sandbox Code Playgroud)
我无法理解使用 Celery 的优势是什么。我知道你可以将 Celery 与 Redis、RabbitMQ 等一起使用,但为什么我不直接获取这些消息队列服务的客户端,而不是将 Celery 放在它前面呢?
我想在 24 小时后删除一个实例,因为该实例已创建如何使用 celery
创建实例后如何启动“TIMER”?
我想要像 Snapchat 这样的东西
我有一个这样写的任务:
@async_runner.app.task(name='task_name')
def async_task():
async_runner.send_task(
task_fn=task_processing,
queue='queue_name',
options=async_runner.DEFAULT_RETRY_POLICY
)
Run Code Online (Sandbox Code Playgroud)
我的默认任务时间限制是 30 分钟。我想将此特定任务的时间限制增加到 1 小时。
如何为这一项任务设置不同的时间限制?
我已经看过这个,但我的问题是特定于 Flask 以及如何在 Flask 中配置 Celery。谢谢。
对于 Django 项目来说,创建基于函数的任务非常干净。只需在 django 应用程序中创建tasks.py并开始编写任务,就像这个示例一样,该示例取自官方celery文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
Run Code Online (Sandbox Code Playgroud)
但有时基于函数的任务是紧密耦合的并且可重用性不高。所以我想创建基于类的芹菜任务,该任务记录在官方网站中。在遵循https://github.com/celery/celery/issues/3874之后,我可以创建示例任务,但我不确定它是否是创建基于类的任务的正确方法。
from __future__ import absolute_import, unicode_literals
from celery import shared_task, Task
import time
from celery import current_app
@shared_task
def add(x, y):
time.sleep(5)
return x + y
@shared_task
def mul(x, y):
return x * y
# Sample class based task for testing
class AnotherTask(current_app.Task):
name = 'tasks.another_task'
def …Run Code Online (Sandbox Code Playgroud) 我正在按照手册中描述的方式启动 Celery 工人......
$ stoneid salimfadhley$ celery worker
-------------- celery@TCR-C02X29QPJHC9.local v4.2.1 (windowlicker)
---- **** -----
--- * *** * -- Darwin-18.2.0-x86_64-i386-64bit 2018-11-13 14:00:20
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: default:0x10fb4ee48 (.default.Loader)
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues] …Run Code Online (Sandbox Code Playgroud) 我想创建一个第三方聊天机器人 API,它是异步的,并在 10 秒暂停后回复“ok”。
import time
def wait():
time.sleep(10)
return "ok"
# views.py
def api(request):
return wait()
Run Code Online (Sandbox Code Playgroud)
我已经尝试了 celery,如下所示,我正在等待 celery 响应:
import time
from celery import shared_task
@shared_task
def wait():
time.sleep(10)
return "ok"
# views.py
def api(request):
a = wait.delay()
work = AsyncResult(a.id)
while True:
if work.ready():
return work.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)
但是这个解决方案是同步工作的,没有区别。我们如何在不要求用户继续请求直到收到结果的情况下使其异步?