xia*_*012 6 python parallel-processing python-rq
我现在有大量文档要处理,并且正在使用 Python RQ 来并行化任务。
我希望完成一系列工作,因为对每个文档执行不同的操作。例如:A-> B->C表示将文档传递给 function A,A完成后,继续B和 last C。
然而,Python RQ 似乎并没有很好地支持管道的东西。
这是一个简单但有点脏的方法。一句话,流水线上的每个函数都以嵌套的方式调用它的下一个函数。
例如,对于管道A-> B-> C。
在顶层,一些代码是这样写的:
q.enqueue(A, the_doc)
其中 q 是Queue实例,在函数中A有如下代码:
q.enqueue(B, the_doc)
在 中B,有这样的事情:
q.enqueue(C, the_doc)
还有比这更优雅的方式吗?例如ONE函数中的一些代码:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
depends_on参数是最接近我的要求的参数,但是,运行如下:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
不会工作。As 在q.enqueue(B, depends_on=A_job )被执行后立即A_job = q.enqueue(A, the_doc)执行。到 B 入队时,A 的结果可能还没有准备好,因为它需要时间来处理。
PS:
如果 Python RQ 不是很擅长这个,那么我可以使用 Python 中的其他什么工具来达到同样的目的:
小智 0
dependent_on 参数是最接近我的要求的参数,但是,运行如下所示的命令:
A_job = q.enqueue(A, the_doc) q.enqueue(B, dependent_on=A_job )
行不通的。因为 q.enqueue(B, dependent_on=A_job ) 在 A_job = q.enqueue(A, the_doc) 执行后立即执行。当 B 入队时,A 的结果可能尚未准备好,因为处理需要时间。
对于这种情况,q.enqueue(B,depends_on=A_job)将在A_job完成后运行。如果结果没有准备好,q.enqueue(B,depends_on=A_job)将等待,直到它准备好。
它不支持开箱即用,但使用其他技术是可能的。
就我而言,我使用缓存来跟踪链中的前一个作业,因此当我们想要将新函数排入队列(在之后立即运行)时,我们可以在调用 enqueue() 时正确设置其“depends_on”参数
请注意使用附加参数进行排队:'timeout, result_ttl, ttl'。自从我在 rq 上运行长时间作业后就使用了这些。您可以在 python rq 文档中参考它们的使用。
我使用了 django_rq.enqueue() ,它源自python rq
# main.py
def process_job():
...
# Create a cache key for every chain of methods you want to call.
# NOTE: I used this for web development, in your case you may want
# to use a variable or a database, not caching
# Number of time to cache and keep the results in rq
TWO_HRS = 60 * 60 * 2
cache_key = 'update-data-key-%s' % obj.id
previous_job_id = cache.get(cache_key)
job = django_rq.enqueue(update_metadata,
campaign=campaign,
list=chosen_list,
depends_on=previous_job_id,
timeout=TWO_HRS,
result_ttl=TWO_HRS,
ttl=TWO_HRS)
# Set the value for the most recent finished job, so the next function
# in the chain can set the proper value for 'depends_on'
cache.set(token_key, job.id, TWO_HRS)
# utils.py
def update_metadata(campaign, list):
# Your code goes here to update the campaign object with the list object
pass
Run Code Online (Sandbox Code Playgroud)
'depends_on' - 来自rq 文档:
depends_on - 指定在该作业排队之前必须完成的另一个作业(或作业 ID)