ElB*_*ule 9 python unsafe pipe multiprocessing
Pipes当有多个发送者和接收者时,我不明白为什么说不安全.
Queues如果是这种情况,如何将以下代码转换为代码?关闭时Queues不要抛出EOFError,所以我的进程无法停止.我是否应该无休止地发送"Poison"消息告诉他们停止(这样,我确定我的所有进程都至少收到一个毒药)?
我想保持管道p1打开,直到我另有决定(这是我发送10条消息的时候).
from multiprocessing import Pipe, Process
from random import randint, random
from time import sleep
def job(name, p_in, p_out):
print(name + ' starting')
nb_msg = 0
try:
while True:
x = p_in.recv()
print(name + ' receives ' + x)
nb_msg = nb_msg + 1
p_out.send(x)
sleep(random())
except EOFError:
pass
print(name + ' ending ... ' + str(nb_msg) + ' message(s)')
if __name__ == '__main__':
p1_in, p1_out = Pipe()
p2_in, p2_out = Pipe()
proc = []
for i in range(3):
p = Process(target=job, args=(str(i), p1_out, p2_in))
p.start()
proc.append(p)
for x in range(10):
p1_in.send(chr(97+x))
p1_in.close()
for p in proc:
p.join()
p1_out.close()
p2_in.close()
try:
while True:
print(p2_out.recv())
except EOFError:
pass
p2_out.close()
Run Code Online (Sandbox Code Playgroud)
nne*_*neo 16
本质上,问题是Pipe围绕平台定义的管道对象的薄包装.recv只需重复接收一个字节缓冲区,直到获得完整的Python对象.如果两个线程或进程recv在同一个管道上使用,则读取可能会交错,从而使每个进程都有半个pickle对象,从而破坏数据.Queues在进程之间进行适当的同步,但代价是更复杂.
正如multiprocessing文档所说:
请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏.当然,同时使用管道的不同端的进程不存在损坏的风险.
你不必无休止地送毒药; 每个工人一个就是你所需要的.每个工人在退出之前都会拿出一个毒丸,所以工人不会错过这个消息.
您还应考虑使用multiprocessing.Pool而不是重新实现"工作进程"模型 - Pool有许多方法可以使分布跨多个线程的工作变得非常容易.
我不明白为什么当有多个发送者和接收者时,管道被认为是不安全的.
考虑您同时将水从源A和B放入管道中.在管道的另一端,你不可能找到哪一部分水来自A或B,对吗?:)
管道在字节级别上传输数据流.如果没有通信协议,它就不知道消息是什么,因此无法确保消息的完整性.因此,使用具有多个发送器的管道不仅"不安全".这是一个重大的设计缺陷,很可能会导致沟通问题.
但是,队列是在更高级别上实现的.它们用于传递消息(甚至是抽象对象).队列用于保持消息/对象自包含.多个源可以将对象放入队列中,并且多个消费者可以拉动这些对象,同时100%确定作为一个单元进入队列的任何内容也作为一个单元出来.
编辑了很长一段时间后:
我应该在字节流中添加,所有字节的检索顺序与发送(保证)相同.多个发件人的问题是发送顺序(输入顺序)可能已经不清楚或随机,即多个流可能以不可预测的方式混合.
通用队列实现可确保单个消息保持不变,即使有多个发件人也是如此.消息也按发送的顺序检索.但是,对于多个竞争发送器而没有进一步的同步机制,再次无法保证输入消息的顺序.
| 归档时间: |
|
| 查看次数: |
9210 次 |
| 最近记录: |