如何在两个 Python 异步协程之间使用读/写流?

lep*_*rem 5 python pipe python-asyncio

如何使用 asyncio 在两个协程之间实现管道,一个协程从流中读取,另一个向其中写入?

假设我们有这个现有代码,两个简单的脚本。生成标准输出的一个:

# produce.py

import asyncio
import random
import sys

async def produce(stdout):
    for i in range(10000):
        await asyncio.sleep(random.randint(0, 3))
        print(i, file=stdout, flush=True)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce(sys.stdout))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

另一个从标准输入读取:

# consume.py

async def consume(loop, stdin):
    reader = asyncio.StreamReader(loop=loop)
    reader_protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: reader_protocol, stdin)

    while True:
        line = await reader.readline()
        if not line:
            break
        print(int(line) ** 2)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(consume(loop, sys.stdin))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

显然,由于我们的两个部分可以从命令行单独运行,因此我们可以使用subprocess带有 shell 管道 ( produce | consume) 的模块。

但我们希望在 Python 中实现相当于 Unix 管道的功能,即连接这两个现有函数的流。

像这样的事情是行不通的:

pipe = io.BytesIO()

await asyncio.gather(produce(pipe),
                     consume(loop, pipe))
Run Code Online (Sandbox Code Playgroud)

如果这两个函数可以操作生成器,我们可以编写如下代码(python 3.6):

async def produce():
    for i in range(10000):
        await asyncio.sleep(random.randint(0, 3))
        yield str(i)


async def consume(generator):
    async for value in generator:
        print(int(value) ** 2)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(consume(produce()))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

asyncio API 的某些部分是否允许这样做?

谢谢!

Nat*_*tim 6

解决这个问题的一种方法是将当前的函数转换为生成器并编写一些包装器以使用 Unix 管道公开它们:

# wrapper.py

import asyncio
import random
import sys


async def produce():
    for i in range(10000):
        await asyncio.sleep(random.randint(0, 3))
        yield str(i)


async def consume(generator):
    async for value in generator:
        print(int(value) ** 2)


async def system_out_generator(loop, stdout, generator):
    async for line in generator:
        print(line, file=stdout, flush=True)


async def system_in_generator(loop, stdin):
    reader = asyncio.StreamReader(loop=loop)
    reader_protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: reader_protocol, stdin)
    while True:
        line = await reader.readline()
        if not line:
            break
        yield line


async def main(loop):
    try:
        if sys.argv[1] == "produce":
            await system_out_generator(loop, sys.stdout, produce())
        elif sys.argv[1] == "consume":
            await consume(system_in_generator(loop, sys.stdin))
    except IndexError:
        await consume(produce())


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
Run Code Online (Sandbox Code Playgroud)

您可以使用:

python wrapper.py  # Python generators
Run Code Online (Sandbox Code Playgroud)

或者:

python wrapper.py produce | python wrapper.py consume  # System pipes
Run Code Online (Sandbox Code Playgroud)