wks*_*rtz 8 python parallel-processing python-3.x
这是我最近的问题的扩展,避免了Python 3的多处理队列中的竞争条件.希望这个版本的问题更加具体.
TL; DR:在一个多处理模型中,工作进程是从队列中提供的multiprocessing.Queue,为什么我的工作进程如此空闲?每个进程都有自己的输入队列,因此它们不会为共享队列的锁而互相争斗,但队列花费大量时间实际上是空的.主进程是运行I/O绑定线程 - 是否会减慢输入队列的CPU绑定填充?
我试图在一定的约束下找到N个集合的笛卡尔乘积的最大元素,每个集合具有M_i个元素(对于0 <= i <N).回想一下,笛卡尔积的元素是长度为N的元组,其元素是N组的元素.我将这些元组称为"组合",以强调我正在循环遍历原始集合的每个组合.当我的函数is_feasible返回时,组合符合约束True.在我的问题中,我试图找到其元素具有最大权重的组合:sum(element.weight for element in combination).
我的问题规模很大,但我公司的服务器也是如此.我正在尝试将以下串行算法重写为并行算法.
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
Run Code Online (Sandbox Code Playgroud)
我目前的多处理方法是创建工作进程并使用输入队列提供它们的组合.当工人收到毒丸时,他们将他们在输出队列中看到的最佳组合放置并退出.我从主进程的主线程填充输入队列.这种技术的一个优点是我可以从主进程生成一个辅助线程来运行一个监视工具(我只能使用一个REPL来查看到目前为止已处理了多少组合以及队列有多满).
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+ / +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
Run Code Online (Sandbox Code Playgroud)
我原本让所有工作人员从一个输入队列中读取,但发现它们都没有击中CPU.确定他们花了所有时间等待queue.get()取消阻止,我给了他们自己的队列.这增加了对CPU的压力,所以我认为工人更活跃.但是,队列大部分时间都是空的!(我从我提到的监控REPL中知道这一点).这告诉我,填充队列的主循环很慢.这是循环:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
Run Code Online (Sandbox Code Playgroud)
我猜测瓶颈是worker.in_q.put().我该如何加快速度?我的第一直觉是让工作者变慢,但这没有意义......监视器线程是否经常停止循环的问题?我怎么能说出来?
或者,有没有另一种方法来实现这个不涉及如此多的等待锁?
你的元素是什么样的?可能是腌制它们以将它们放入队列的速度很慢,这显然会成为瓶颈。请注意,每个元素都被一遍又一遍地独立腌制。
如果是这种情况,这种方法可能会有所帮助:
pickle.dumps一次,然后将相同的字符串传输给每个工作人员,或者可能通过共享内存或其他方式)。product(my_A_subset, *other_sets)(可能以不同的顺序),在每个作业(或每三个作业或其他)之间轮询某种停止信号。这不需要通过队列,一位共享内存值就可以正常工作。| 归档时间: |
|
| 查看次数: |
1078 次 |
| 最近记录: |