ds_*_*ser 2 python parallel-processing multiprocessing python-3.x python-multiprocessing
我现在正在处理一个大数据集。我的输入将是 4 个不同的数据集,并且我必须对每个数据集应用特定的函数。所以我所做的就是读取所有四个数据集,并使用 pool.map 将函数并行应用到每个数据集。所以现在我有一个父进程和 4 个子进程。直到这一切都很好。
Q1. 现在每个进程内部发生了什么。在我应用于每个数据集的函数中,我将每个元组与其他元组进行比较,因此它是一种递归。有没有办法使其并行,因为这种比较可能需要很长时间,因为数据集很大。如何制作它,因为它已经是一个子进程?是否可以在子进程中再次并行化它,因为我有更多的处理器,所以我想利用它。
Q2。我对于这个递归任务的并行化的想法是,如果我将元组 x 与元组 y 进行比较(每个元组与所有其他元组),我可以为 x 制作块,每个块与 y 进行比较。我想这可以通过两个“for 循环”来完成。有什么建议如何做到这一点?
回复:Q1,如果您使用 a 创建子进程multiprocessing.Pool,那么不行,工作进程不能有子进程。尝试创建一个会引发异常:
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
原因已经说得很清楚了 - a 中的进程Pool是守护进程,并且守护进程不能有子进程。原因是终止父进程将终止其守护子进程,但守护子进程将无法终止其子进程,这将留下孤立进程。文档中对此进行了说明:
请注意,守护进程不允许创建子进程。否则,如果守护进程在其父进程退出时终止,则其子进程将成为孤立的。
您可以通过父进程创建一组非守护Process进程对象来解决这个问题,而不是使用Pool. 然后,每个孩子都可以创建自己的multiprocessing.Pool:
import multiprocessing
def subf(x):
print "in subf"
def f(x):
print "in f"
p = multiprocessing.Pool(2)
p.map(subf, range(2))
if __name__ == "__main__":
processes = []
for i in range(2):
proc = multiprocessing.Process(target=f, args=(i,))
proc.start()
processes.append(proc)
Run Code Online (Sandbox Code Playgroud)
输出:
in f
in f
in subf
in subf
in subf
in subf
Run Code Online (Sandbox Code Playgroud)
这种方法似乎适合您,因为您的初始数据集仅包含四个项目。您可以只为数据集中的每个项目创建一个Process,并且仍然有一些空闲 CPU 可供每个子进程在小型Pool.
回复:Q2,听起来您可以使用它来itertools.product为您想要比较的每对元组创建一个大型迭代。然后,您可以用于pool.map并行比较每一对。下面的示例展示了其工作原理:
def f(x):
print(x)
if __name__ == "__main__":
# Create two lists of tuples, like your use-case
x = zip(range(3), range(3,6))
y = zip(range(6, 9), range(9, 12))
pool = multiprocessing.Pool()
pool.map(f, itertools.product(x, y))
Run Code Online (Sandbox Code Playgroud)
输出:
((0, 3), (6, 9))
((0, 3), (7, 10))
((0, 3), (8, 11))
((1, 4), (6, 9))
((1, 4), (7, 10))
((1, 4), (8, 11))
((2, 5), (6, 9))
((2, 5), (8, 11))
((2, 5), (7, 10))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1976 次 |
| 最近记录: |