Python RQ:回调模式

xia*_*012 6 python parallel-processing python-rq

我现在有大量文档要处理,并且正在使用 Python RQ 来并行化任务。

我希望完成一系列工作,因为对每个文档执行不同的操作。例如:A-> B->C表示将文档传递给 function AA完成后,继续B和 last C

然而,Python RQ 似乎并没有很好地支持管道的东西。

这是一个简单但有点脏的方法。一句话,流水线上的每个函数都以嵌套的方式调用它的下一个函数。

例如,对于管道A-> B-> C

在顶层,一些代码是这样写的:

q.enqueue(A, the_doc)

其中 q 是Queue实例,在函数中A有如下代码:

q.enqueue(B, the_doc)

在 中B,有这样的事情:

q.enqueue(C, the_doc)

还有比这更优雅的方式吗?例如ONE函数中的一些代码:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on参数是最接近我的要求的参数,但是,运行如下:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

不会工作。As 在q.enqueue(B, depends_on=A_job )被执行后立即A_job = q.enqueue(A, the_doc)执行。到 B 入队时,A 的结果可能还没有准备好,因为它需要时间来处理。

PS:

如果 Python RQ 不是很擅长这个,那么我可以使用 Python 中的其他什么工具来达到同样的目的:

  1. 循环并行化
  2. 管道处理支持

小智 0

dependent_on 参数是最接近我的要求的参数,但是,运行如下所示的命令:

A_job = q.enqueue(A, the_doc) q.enqueue(B, dependent_on=A_job )

行不通的。因为 q.enqueue(B, dependent_on=A_job ) 在 A_job = q.enqueue(A, the_doc) 执行后立即执行。当 B 入队时,A 的结果可能尚未准备好,因为处理需要时间。

对于这种情况,q.enqueue(B,depends_on=A_job)将在A_job完成后运行。如果结果没有准备好,q.enqueue(B,depends_on=A_job)将等待,直到它准备好。


它不支持开箱即用,但使用其他技术是可能的。

就我而言,我使用缓存来跟踪链中的前一个作业,因此当我们想要将新函数排入队列(在之后立即运行)时,我们可以在调用 enqueue() 时正确设置其“depends_on”参数

请注意使用附加参数进行排队:'timeout, result_ttl, ttl'。自从我在 rq 上运行长时间作业后就使用了这些。您可以在 python rq 文档中参考它们的使用。

我使用了 django_rq.enqueue() ,它源自python rq

    # main.py
    def process_job():
        ...

        # Create a cache key for every chain of methods you want to call.
        # NOTE: I used this for web development, in your case you may want
        # to use a variable or a database, not caching

        # Number of time to cache and keep the results in rq
        TWO_HRS = 60 * 60 * 2

        cache_key = 'update-data-key-%s' % obj.id
        previous_job_id = cache.get(cache_key)
        job = django_rq.enqueue(update_metadata,
                                campaign=campaign,
                                list=chosen_list,
                                depends_on=previous_job_id,
                                timeout=TWO_HRS,
                                result_ttl=TWO_HRS,
                                ttl=TWO_HRS)

        # Set the value for the most recent finished job, so the next function
        # in the chain can set the proper value for 'depends_on'
        cache.set(token_key, job.id, TWO_HRS)

    # utils.py
    def update_metadata(campaign, list):
        # Your code goes here to update the campaign object with the list object
        pass
Run Code Online (Sandbox Code Playgroud)

'depends_on' - 来自rq 文档

depends_on - 指定在该作业排队之前必须完成的另一个作业(或作业 ID)