python asyncio如何读取StdIn并写入StdOut?

use*_*309 5 python stdin stdout python-asyncio

我需要异步读取 StdIn 以获取消息(json 由 \r\n 终止),并在处理异步后将更新的消息写入 StdOut。

目前我正在同步进行,例如:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload
Run Code Online (Sandbox Code Playgroud)

如何执行相同但异步的操作?

ale*_*ame 11

这里的回波的一个例子stdin,以stdout使用ASYNCIO流(对于Unix)。

import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

作为即用型解决方案,您可以使用aioconsole库。它实现了一个类似的方法,而且还提供附加的有用异步当量inputprintexeccode.interact

from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()
Run Code Online (Sandbox Code Playgroud)

更新:

让我们试着弄清楚这个函数是如何connect_stdin_stdout工作的。

  1. 获取当前事件循环:
loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)
  1. 创建StreamReader实例。
reader = asyncio.StreamReader()
Run Code Online (Sandbox Code Playgroud)

通常,StreamReader/StreamWriter类不打算直接实例化,而应仅作为open_connection()和等函数的结果使用start_server()StreamReader为某些数据流提供缓冲的异步接口。一些源(库代码)调用其功能,例如feed_datafeed_eof,该数据被缓冲,并且可以使用被读取记录接口协程read()readline()和等

  1. 创建StreamReaderProtocol实例。
protocol = asyncio.StreamReaderProtocol(reader)
Run Code Online (Sandbox Code Playgroud)

此类派生自asyncio.ProtocolandFlowControlMixin并有助于在Protocol和之间进行调整StreamReader。它覆盖了这样的Protocol方法为data_receivedeof_received并调用StreamReader方法feed_data

  1. stdin在事件循环中注册标准输入流。
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
Run Code Online (Sandbox Code Playgroud)

connect_read_pipe函数将一个pipe类似文件的对象作为参数。stdin是一个类似文件的对象。从现在开始,所有从 读取的数据stdin都将落入StreamReaderProtocol,然后传入StreamReader

  1. stdout在事件循环中注册标准输出流。
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
Run Code Online (Sandbox Code Playgroud)

connect_write_pipe您需要传递一个协议工厂,该工厂创建实现流控制逻辑的协议实例StreamWriter.drain()。这个逻辑在类中实现FlowControlMixin。也StreamReaderProtocol继承了它。

  1. 创建StreamWriter实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
Run Code Online (Sandbox Code Playgroud)

这个类转发数据传递给它的使用功能write()writelines()等到底层transport

protocol用于支持drain()等待底层传输已刷新其内部缓冲区并可以再次写入的功能。

reader是一个可选参数并且可以None,它也用于支持该drain()功能,在此功能开始时检查是否为阅读器设置了异常,例如,由于连接丢失(与套接字和双向连接相关) ),那么drain()也会抛出异常。

您可以在这个很棒的答案中阅读更多关于StreamWriterdrain()功能的信息。

更新 2:

读取带有\r\n分隔符的行readuntil可以使用