标签: python-rq

如何使用python-rq明确地给队列赋予优先级

我正在尝试python-rq,我不知道如何明确地给队列优先级?

优先级是否来自工作人员启动时定义的顺序?

rqworker queueA queueB queueC
Run Code Online (Sandbox Code Playgroud)

queueA优先于queueB和queueC?

python-rq

8
推荐指数
1
解决办法
1013
查看次数

python rq - 如何在完成多个其他作业时触发作业?多工作依赖工作?

我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个相关的注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业向我的前端发送 websocket 通知。

我目前的尝试:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
Run Code Online (Sandbox Code Playgroud)

不幸的是,似乎多作业依赖功能从未合并到主版本中。我看到目前 git 上有两个拉取请求。有我可以使用的解决方法吗?

很抱歉未能提供可重现的示例。

python queue redis python-3.x python-rq

8
推荐指数
1
解决办法
920
查看次数

如何在Python中正确捕获和处理RQ超时?

试图找到一种捕获RQ作业超时的好方法,因此可以在超时后重新排队.

基本上,正确的解决方案将提供一种方法(例如,工作者中的异常处理程序或类似的东西)来重新排队超时的作业.此外,如果作业返回failed队列,那也是一个很好的答案.

非常感谢!任何帮助将不胜感激!

python queue task-queue redis python-rq

7
推荐指数
1
解决办法
1534
查看次数

如何从队列中清除Django RQ作业?

我觉得有点愚蠢,但它似乎没有出现在RQ文档中.我有一个"失败"的队列,其中包含数千个项目,我想使用Django管理界面清除它.管理界面列出了它们并允许我单独删除和重新排队,但我不敢相信我必须潜入django shell来批量执行它.

我错过了什么?

django python-rq

6
推荐指数
2
解决办法
4602
查看次数

Python RQ:回调模式

我现在有大量文档要处理,并且正在使用 Python RQ 来并行化任务。

我希望完成一系列工作,因为对每个文档执行不同的操作。例如:A-> B->C表示将文档传递给 function AA完成后,继续B和 last C

然而,Python RQ 似乎并没有很好地支持管道的东西。

这是一个简单但有点脏的方法。一句话,流水线上的每个函数都以嵌套的方式调用它的下一个函数。

例如,对于管道A-> B-> C

在顶层,一些代码是这样写的:

q.enqueue(A, the_doc)

其中 q 是Queue实例,在函数中A有如下代码:

q.enqueue(B, the_doc)

在 中B,有这样的事情:

q.enqueue(C, the_doc)

还有比这更优雅的方式吗?例如ONE函数中的一些代码:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on参数是最接近我的要求的参数,但是,运行如下:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

不会工作。As 在q.enqueue(B, depends_on=A_job )被执行后立即A_job …

python parallel-processing python-rq

6
推荐指数
1
解决办法
3284
查看次数

类型错误:无法序列化“_io.TextIOWrapper”对象 - Flask

我正在编写一个烧瓶应用程序,要求用户上传 excel 电子表格,然后计算并填充数据库。我试图通过Redis RQ在后台进行处理部分,但我不断收到TypeError: cannot serialize '_io.TextIOWrapper' object my代码如下所示:

from redis import Redis
from rq import Queue
from rq.job import Job
import xlrd as x

workbook = x.open_workbook('data.xls')
sheet = workbook.sheet_by_index(0)
q = Queue(connection = Redis())

def populate(sheet,row,column):
    #extract data and save into database

job = enqueue_call(func=populate, args=(sheet,7,5), result_ttl = 5000)
print(job.get_id())
Run Code Online (Sandbox Code Playgroud)

task-queue redis flask python-rq

6
推荐指数
0
解决办法
3127
查看次数

排队调用中的rq超时参数不起作用,给出JobTimeoutException

我正在尝试更改RQ工作的超时时间,但是似乎没有任何效果。我有一些效果:

my_queue = Queue('my_task', connection=Redis())

job_args = (1, 2, 4)

my_queue.enqueue_call(
                    my_func,
                    args=job_args,
                    timeout=2700
            )
Run Code Online (Sandbox Code Playgroud)

但我仍然

JobTimeoutException: Job exceeded maximum timeout value (180 seconds)
Run Code Online (Sandbox Code Playgroud)

我非常拼命,甚至尝试进入rq模块queue.py并将超时的默认参数更改为2700和DEFAULT_TIMEOUT(在Queue类中定义的变量,其中包含enqueue_call方法)。我是否缺少任何东西?有人知道吗?谢谢!

python redis python-rq

6
推荐指数
2
解决办法
1953
查看次数

Celery vs RQ 基准测试

由于 celery 的可靠性和调度问题,我们决定评估替代方案。我一直在努力在两个消息队列解决方案之间建立一个关于基本性能的基准。

我目前的方法是在两个不同的队列中放置 1000 个任务(获取 nvie.com 并计算网站上的字数),并测量 4 celery(20 秒)与 4 rq 工人(70 秒)的速度。我的代码是https://github.com/swarchris8/celery-vs-rq-benchmark我通过命令行运行 celery,通过 Mac 上的主管运行 rq,从 vagrant 文件中可以清楚地看到 rq 的 Ubuntu 运行指令。

Celery 的性能要好得多,我不确定我测量队列清除速度的测试设置是否存在用于测量任务吞吐量的缺陷。我也在使用默认的 RQ 工人,我怀疑它可能会慢得多。

我的方法是在吞吐量方面对两个消息队列系统进行基准测试的正确方法吗?你采取了哪些方法?celery 比 RQ 快这么多吗?

performance message-queue celery python-3.x python-rq

6
推荐指数
0
解决办法
1210
查看次数

如何在 Docker 容器(Python、Flask 和 Redis)中启动自定义 RQ Worker

我遵循 Miguel Grinberg 的优秀 Flask Mega 教程,成功设置了一个带有 Redis 任务队列和 RQ 工作线程的 Flask Web 应用程序,所有这些都在 Docker 容器中。

为了提高任务队列性能,我现在需要使用自己的自定义工作线程,而不是默认的 RQ 工作线程。

不幸的是,我很难理解如何在 docker 中启动自定义工作线程。

要启动默认的 RQ Worker,Flask Mega 教程使用以下方法:使用“venv/bin/rq”覆盖 Docker 入口点,然后提供参数“worker -u redis://redis-server:6379/0 microblog-tasks” ”。

可执行文件名称由 --entrypoint 标志提供,而命令参数在容器映像名称之后的最后传递。

这是完整的命令 - 只有最后两行与这个问题相关。

$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
-e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
-e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
--link mysql:dbserver --link redis:redis-server \
-e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
-e REDIS_URL=redis://redis-server:6379/0 \
--entrypoint venv/bin/rq \
microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks …
Run Code Online (Sandbox Code Playgroud)

entry-point redis flask python-rq docker

6
推荐指数
1
解决办法
5105
查看次数

大内存Python后台作业

我正在运行Flask服务器,它将数据加载到MongoDB数据库中.由于存在大量数据,这需要很长时间,我想通过后台工作来完成这项工作.

我使用Redis作为消息代理和Python-rq来实现作业队列.所有代码都在Heroku上运行.

据我所知,python-rq使用pickle来序列化要执行的函数,包括参数,并将其与其他值一起添加到Redis哈希值.

由于参数包含要保存到数据库的信息,因此它非常大(约50MB),当它被序列化并保存到Redis时,不仅耗费了大量时间,而且还消耗了大量内存.Heris的Heroku计划仅售100美元,售价30美元.事实上我经常会遇到OOM错误:

OOM command not allowed when used memory > 'maxmemory'.

我有两个问题:

  1. python-rq是否适合这项任务,或者Celery的JSON序列化更合适?
  2. 有没有办法不序列化参数,而是引用它?

您对最佳解决方案的想法非常感谢!

python heroku mongodb flask python-rq

5
推荐指数
1
解决办法
401
查看次数