Enz*_*ime 2 multithreading mpi python-2.7
好的,所以我想在树状结构中进行多线程深度优先搜索。为此,我使用集群中多台计算机的线程(本例中使用 localhost 四核和树莓派 2)。主线程应该启动该进程,并且在树中的第一次分割时,对于它分割成的每个节点,它应该生成一个新线程。然后,这些线程应该能够将它们的发现报告给主线程。
我试图动态地执行此操作,而不是为 mpiexec 提供多个线程,因为我事先不知道树会是什么样子(例如可能有 2 或 9 个分割)。
我从我正在处理的项目中制作了一个样本来解决这个问题,并且它的工作原理如下。它从一串数字中取出一个数字,并为每个数字生成一个线程并将该数字发送到该线程。
对于大师来说:
#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time
################ Set up MPI variables ################
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()
################ Master code ################
script = 'cpi.py'
for d in '34':
try:
print 'Trying to spawn child process...'
icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
spawnrank = icomm.Get_rank()
icomm.send(d, dest=spawnrank, tag=11)
print 'Spawned rank %d.' % spawnrank
except: ValueError('Spawn failed to start.')
solved = False
while solved == False:
#while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
# print 'spawns doing some work...'
# time.sleep(1)
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22)
print 'received solution: %d' % solved
Run Code Online (Sandbox Code Playgroud)
它正确地生成了工人,他们收到数字但不将其发送回主人。工人的代码如下:
工人
#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy
################ Set up MPI variables ################
icomm = MPI.Comm.Get_parent()
comm = MPI.COMM_WORLD
irank = comm.Get_rank()
rank = comm.Get_rank()
running = True
while running:
data = None
data = icomm.recv(source=0, tag=11)
if data:
print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank)
icomm.send(data, dest=0, tag=22)
break
print 'Worker on rank %d done.' % rank
icomm.Disconnect()
Run Code Online (Sandbox Code Playgroud)
它永远不会到达主代码的最后一行。我还在主代码中添加(注释掉)了一个探测器,以检查带有标签 22 的消息是否挂在某处,从而排除了 recv 函数中的错误,但探测器从未找到该消息。所以我认为它永远不会发送。
通过打印两个进程的排名,我发现它们都使用排名 0,这是有道理的,因为它们是在同一台计算机上生成的。但是当我添加主机文件和等级文件,试图强制它为从属设备使用不同的计算机时,它给了我以下错误:
[hch-K55A:06917] *** Process received signal ***
[hch-K55A:06917] Signal: Segmentation fault (11)
[hch-K55A:06917] Signal code: Address not mapped (1)
[hch-K55A:06917] Failing at address: 0x3c
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340]
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70]
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac]
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e]
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a]
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8]
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4]
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe]
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3]
[hch-K55A:06917] [ 9] mpiexec() [0x40347d]
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5]
[hch-K55A:06917] [11] mpiexec() [0x403399]
[hch-K55A:06917] *** End of error message ***
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)
使用的命令: mpiexec -np 1 --hostfile hostfile --rankfilerankfile python spawntest.py
主机文件:本地主机本地主机插槽= 1最大插槽= 4 pi2@raspi2插槽= 4
Rankfile:排名 0=localhost 插槽=1 排名 1=pi2@raspi2 插槽=1-4
所以我的问题如下;如何在主机以外的计算机上生成这些线程,同时能够来回发送数据?
你的主人的代码是非常错误的,我觉得你对那里发生的事情缺乏一些概念性的理解。
MPI_COMM_SPAWN由(或其 mpi4py 对应项)生成的作业中的 MPI 进程comm.Spawn()不会成为父级MPI_COMM_WORLD. 生成的进程形成一个完全独立的世界通信器,并通过互通器与父作业互连,这正是生成返回的内容。在您的情况下,icomm = MPI.COMM_SELF.Spawn(...)是主进程中的内部通信器句柄。子作业中的进程使用MPI_COMM_GET_PARENT(MPI.Comm.Get_parent()在 mpi4py 中)获取内部通信器句柄。由于您正在生成单进程作业:
MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
^^^^^^^^^^
Run Code Online (Sandbox Code Playgroud)
在新形成的子作业世界通讯器中只有一个进程,因此MPI.COMM_WORLD.Get_rank()每个工人返回零。
您的主人代码的这一部分是错误的,但由于内部通信器的实际工作方式,它仍然有效:
spawnrank = icomm.Get_rank() # <--- not what you expect
icomm.send(d, dest=spawnrank, tag=11)
Run Code Online (Sandbox Code Playgroud)
内部通信器连接两个独立的进程组。其中一个称为本地组,另一个称为远程组。在对讲机上使用MPI_COMM_RANK( ) 时,可以获得本地组comm.Get_rank()中呼叫进程的排名。但是,在发送或接收时,指定的等级与远程组相关。在您的情况下,产生一个新的工作人员会产生以下交互器:
MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
^^^^^^^^^^
Run Code Online (Sandbox Code Playgroud)
(上面的通讯器显示了每个组来自哪里;通讯器本身不属于内部通讯器的一部分)
哪个组是本地的,哪个是远程的,取决于调用进程属于哪个组。主进程的本地组是子作业中的级别的远程组,反之亦然。这里重要的是,每个组的等级为 0,因为一组中至少有一个进程。你很幸运,主组中有一个进程,因此icomm.Get_rank()返回 0(并且它总是返回零,因为主组的本地组派生自MPI_COMM_SELF,它总是包含一个进程),它恰好(总是)是远程(子)组中的有效排名。正确的做法是将消息发送到您知道远程组中存在的固定排名,例如rank 0:
icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
icomm.send(d, dest=0, tag=11)
Run Code Online (Sandbox Code Playgroud)
(此代码明确发送到0远程组的排名,而在此之前该值0只是一个幸运的巧合)
也就是说,发送部分虽然不正确,但仍然有效。接收部分不这样做,有几个原因。首先,您使用了错误的通信器 - 接收MPI_COMM_WORLD不起作用,因为子进程不是它的成员。事实上,MPI 中的通信器是不可变的 - 如果不创建新的通信器,则无法添加或删除等级。您应该使用icomm从工作人员处接收的方式,就像使用它向他们发送的方式一样。现在,出现了第二个问题 -icomm主作业被每个新作业覆盖Spawn,因此您实际上失去了与除最后一个作业之外的任何子作业进行通信的能力。您需要保留句柄列表并将句柄附加到其中。
接收部分有点复杂。不存在MPI_ANY_COMM- 您无法进行涵盖所有子作业的接收操作,因为它们都位于各自单独的内部通信器中。您应该循环遍历MPI_IPROBE内部通信器列表,或者(更好)从每个子级开始非阻塞接收,然后使用MPI_WAIT_SOME(无论 mpi4py 等效项是什么)。
使用循环,主代码应该看起来像这样(注意 - 未经测试的代码,我没有和/或使用 mpi4py):
#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time
################ Set up MPI variables ################
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()
################ Master code ################
icomms = []
script = 'cpi.py'
for d in '34':
try:
print 'Trying to spawn child process...'
icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
icomm.send(d, dest=0, tag=11)
icomms.append(icomm)
print 'Spawned a child.'
except: ValueError('Spawn failed to start.')
solved = False
while not solved and icomms:
for icomm in icomms:
if icomm.Iprobe(source=0, tag=MPI.ANY_TAG):
print 'A child responded...'
solved = icomm.recv(source=0, tag=MPI.ANY_TAG)
icomm.Disconnect()
icomms.remove(icomm)
if solved: break
if not solved:
print 'spawns doing some work...'
time.sleep(1)
# make sure all pending sends get matched
for icomm in icomms:
icomm.recv(source=0, tag=MPI.ANY_TAG)
icomm.Disconnect()
print 'received solution: %d' % solved
Run Code Online (Sandbox Code Playgroud)
我希望你能明白。
另外:如果您从派生作业中派生作业,则新子作业无法轻松建立与顶级主作业的连接。为此,您应该转向 MPI-2 客户端/服务器模型支持的一个不起眼的部分,并让主服务器使用 打开端口MPI_PORT_OPEN,然后使用 MPI 命名服务将其注册MPI_PUBLISH_NAME,最后使用MPI_COMM_ACCEPT接收来自任何其他 MPI 作业的连接。工作人员应用于MPI_LOOKUP_NAME获取对端口的引用,并用于MPI_COMM_CONNECT与主工作建立内部通信。我不知道 mpi4py 中是否存在这些函数的包装器,如果存在,它们是如何命名的。