我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?

Dav*_*rks 7 python pipe multiprocessing python-3.x python-asyncio

我需要在 Pythonasyncio中的进程之间进行通信,并在每个进程中使用并发网络 IO。

目前我使用multiprocessing.Pipesendrecv显著大量的进程之间的数据,但我这样做的以外asyncio,我相信我花了大量的CPU时间IO_WAIT的缘故吧。

似乎asyncio可以而且应该用于处理进程之间的管道 IO,但是除了管道 STDIN/STDOUT 之外,我找不到任何示例。

从我读到的内容来看,我似乎应该注册管道loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)并同样进行写入。但是我不明白它的目的,protocol_factory因为它与multiprocessing.Pipe. 甚至不清楚我是否应该创建一个multiprocessing.Pipe或我是否可以在asyncio.

ski*_*dge 11

multiprocessing.Pipe使用高级multiprocessing.Connection模块来pickle和unpickle Python对象并在底层传输额外的字节。如果您想使用从这些管道之一读取数据loop.connect_read_pipe(),则必须自己重新实现所有这些。

在不阻塞事件循环的情况下读取 a 的最简单方法multiprocessing.Pipe是使用loop.add_reader(). 考虑以下示例:

import asyncio
import multiprocessing


def main():
    read, write = multiprocessing.Pipe(duplex=False)
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    data_available = asyncio.Event()
    asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)

    while not read.poll():
        await data_available.wait()
        data_available.clear()

    print(read.recv())


def writer(write):
    write.send('Hello World')


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

使用较低级别创建的管道os.pipe不会像管道那样添加任何额外内容multiprocessing.Pipe。因此,我们可以使用os.pipewith loop.connect_read_pipe(),而无需重新实现任何类型的内部工作。这是一个例子:

import asyncio
import multiprocessing
import os


def main():
    read, write = os.pipe()
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    pipe = os.fdopen(read, mode='r')

    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader()
    def protocol_factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
    print(await stream_reader.readline())
    transport.close()


def writer(write):
    os.write(write, b'Hello World\n')


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

这段代码帮助我弄清楚了如何使用loop.connect_read_pipe.


kma*_*ork 6

aiopipe似乎做你想做的事!它可以与内置模块一起使用multiprocessing,并提供与常规阻塞管道类似的 API。