Rut*_*ulV 5 python django redis python-rq
我已经开始使用 RQ/Redis 为我的 django 站点构建一些长时间运行的作业的异步执行。我希望做如下事情:
我想要一个模型的每个实例一个队列。你可以把这个模型想象成一个 api 用户帐户。(这些不会很多。最多 15 - 20 个)
我将在队列中均匀地分配批次的任务(从 10 到 500 之间的任意位置)。在第一批完成之前可以添加多批。
对于每个批次,我想为每个没有积极处理的队列启动一个工作程序,我想以批处理模式运行这些工作程序,以便一旦它们用完任务,它们就会关闭。
我意识到我不能以批处理模式运行它们,然后我将始终在所有队列上工作/监听工作。这样做的问题是我希望能够动态添加和删除队列,所以最好每批启动可用队列。
我意识到我在队列之间分配任务似乎很奇怪,但原因是同一队列中的每个任务必须根据我使用的服务进行速率限制/节流(将其视为 API速率限制,但每个队列代表不同的帐户)。但就我的目的而言,任务在哪个帐户上运行没有区别,所以我不妨跨所有帐户并行化。
我面临的问题是,如果我启动一个工人并给它一个已经在处理的队列,我现在有两个工人在该队列上独立运行,因此我预期的节流率会减半。如果没有工作人员在该队列上运行,我如何仅启动工作人员?我可能会为此找到一个 hacky 解决方案,但我更愿意以“正确”的方式处理它,因为我对队列没有太多经验,所以我想我应该问一下。
我已经在实现我自己的工作类,以便我可以动态控制队列,所以我只需要一种方法来添加逻辑,如果该队列已经在处理,则不会给它一个新的工作人员。我的工人的一个简单版本在这里:
# custom_worker.py
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# dynamically getting the queue names in which I am expecting tasks
queues = [user.name for user in ApiUser.objects.all()]
with Connection():
qs = list(map(Queue, queues)) or [Queue()]
w = Worker(qs)
w.work(burst=True)
Run Code Online (Sandbox Code Playgroud)
找到解决方案只意味着深入研究 python-rq 的源代码。我可能会考虑改进文档。无论如何,这似乎可以满足我的需求!
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# Provide queue names to listen to as arguments to this script,
with Connection():
current_workers = Worker.all()
working_queues = [queue.name for worker in current_workers for queue in worker.queues]
proposed_queues = [user.name for user in ApiUser.objects.all()]
queues_to_start = [queue for queue in proposed_queues if not queue in working_queues]
if len(queues_to_start) > 0:
qs = list(map(Queue, queues_to_start))
w = Worker(qs)
w.work(burst=True)
else:
print("Nothing to do here.")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1752 次 |
| 最近记录: |