为什么Python多处理管道不安全?

ElB*_*ule 9 python unsafe pipe multiprocessing

Pipes当有多个发送者和接收者时,我不明白为什么说不安全.

Queues如果是这种情况,如何将以下代码转换为代码?关闭时Queues不要抛出EOFError,所以我的进程无法停止.我是否应该无休止地发送"Poison"消息告诉他们停止(这样,我确定我的所有进程都至少收到一个毒药)?

我想保持管道p1打开,直到我另有决定(这是我发送10条消息的时候).


from multiprocessing import Pipe, Process
from random import randint, random
from time import sleep

def job(name, p_in, p_out):
    print(name + ' starting')
    nb_msg = 0
    try:
        while True:
            x = p_in.recv()
            print(name + ' receives ' + x)
            nb_msg = nb_msg + 1
            p_out.send(x)
            sleep(random())
    except EOFError:
        pass
    print(name + ' ending ... ' + str(nb_msg) + ' message(s)')

if __name__ == '__main__':
    p1_in, p1_out = Pipe()
    p2_in, p2_out = Pipe()

    proc = []

    for i in range(3):
        p = Process(target=job, args=(str(i), p1_out, p2_in))
        p.start()
        proc.append(p)

    for x in range(10):
        p1_in.send(chr(97+x))
    p1_in.close()
    for p in proc:
        p.join()
    p1_out.close()
    p2_in.close()

    try:
        while True:
            print(p2_out.recv())
    except EOFError:
        pass

    p2_out.close()
Run Code Online (Sandbox Code Playgroud)

nne*_*neo 16

本质上,问题是Pipe围绕平台定义的管道对象的薄包装.recv只需重复接收一个字节缓冲区,直到获得完整的Python对象.如果两个线程或进程recv在同一个管道上使用,则读取可能会交错,从而使每个进程都有半个pickle对象,从而破坏数据.Queues在进程之间进行适当的同步,但代价是更复杂.

正如multiprocessing文档所说:

请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏.当然,同时使用管道的不同端的进程不存在损坏的风险.

你不必无休止地送毒药; 每个工人一个就是你所需要的.每个工人在退出之前都会拿出一个毒丸,所以工人不会错过这个消息.

您还应考虑使用multiprocessing.Pool而不是重新实现"工作进程"模型 - Pool有许多方法可以使分布跨多个线程的工作变得非常容易.

  • 如果这样做,基本上会得到一个“队列”-“ multiprocessing.Queue”是一个“管道”,并带有一对锁(每个方向一个)。因此,这将是安全且合理有效的,但是您也将直接重新发明轮子-为什么不仅仅使用`Queue`? (2认同)

Jan*_*cke 8

我不明白为什么当有多个发送者和接收者时,管道被认为是不安全的.

考虑您同时将水从源A和B放入管道中.在管道的另一端,你不可能找到哪一部分水来自A或B,对吗?:)

管道在字节级别上传输数据流.如果没有通信协议,它就不知道消息是什么,因此无法确保消息的完整性.因此,使用具有多个发送器的管道不仅"不安全".这是一个重大的设计缺陷,很可能会导致沟通问题.

但是,队列是在更高级别上实现的.它们用于传递消息(甚至是抽象对象).队列用于保持消息/对象自包含.多个源可以将对象放入队列中,并且多个消费者可以拉动这些对象,同时100%确定作为一个单元进入队列的任何内容也作为一个单元出来.

编辑了很长一段时间后:

我应该在字节流中添加,所有字节的检索顺序与发送(保证)相同.多个发件人的问题是发送顺序(输入顺序)可能已经不清楚或随机,即多个流可能以不可预测的方式混合.

通用队列实现可确保单个消息保持不变,即使有多个发件人也是如此.消息也按发送的顺序检索.但是,对于多个竞争发送器而没有进一步的同步机制,再次无法保证输入消息的顺序.