flo*_*ian 8 python queue redis python-3.x python-rq
我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个相关的注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业向我的前端发送 websocket 通知。
我目前的尝试:
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
Run Code Online (Sandbox Code Playgroud)
不幸的是,似乎多作业依赖功能从未合并到主版本中。我看到目前 git 上有两个拉取请求。有我可以使用的解决方法吗?
很抱歉未能提供可重现的示例。
新版本(RQ >= 1.8)
您可以简单地使用depends_on
参数,传递列表或元组。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
Run Code Online (Sandbox Code Playgroud)
旧版本(RQ < 1.8)
我使用此解决方法:如果依赖项为n,我创建实际函数的n-1包装器:每个包装器都依赖于不同的作业。
这个解决方案有点复杂,但它有效。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
def second_wrapper(patient_finished, patientid,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
Run Code Online (Sandbox Code Playgroud)
一些注意事项:
我不将队列对象传递给包装器,因为会发生一些序列化问题;所以,必须按名称恢复队列......
出于同样的原因,我将 job.id (而不是作业对象)传递给包装器。
归档时间: |
|
查看次数: |
920 次 |
最近记录: |