是否有书籍或教程显示了如何正确使用asyncio的协议?

Sha*_*baz -1 python python-asyncio

是否有书籍或教程显示了如何正确使用asyncio的协议?Web上的所有示例都将IO混合到协议定义中!

我想编写一个解析器,该解析器进行帧解码并将消息转换为python数据结构。解析此数据结构后,我想将其传递给client

[       ]-->[*protocol parser*]-->[high level api]-->[           ]
[network]                                            [client code]
[       ]<--[*protocol parser*]<--[high level api]<--[           ]
Run Code Online (Sandbox Code Playgroud)

相应地,高级API的客户端传递python数据结构,高级API将该数据结构传递给我的协议,该协议将其转换为正确的字节/文本表示形式并将其传递给传输层。

我假设这是首先抽象出Protocol类的目的。我不想从协议中响应连接的另一端,但这是大多数Web教程显示的内容!

此外,我想了解python世界中提供了哪个高级接口,它是回调,流接口还是其他?

1st*_*st1 5

我想编写一个解析器,该解析器进行帧解码并将消息转换为python数据结构。解析完此数据结构后,我想将其传递给客户端。

[       ]-->[*protocol parser*]-->[high level api]-->[           ]
[network]                                            [client code]
[       ]<--[*protocol parser*]<--[high level api]<--[           ]
Run Code Online (Sandbox Code Playgroud)

相应地,高级API的客户端传递python数据结构,高级API将该数据结构传递给我的协议,该协议将其转换为正确的字节/文本表示形式并将其传递给传输层。

您可以从两个不同的角度来研究协议实现:

  1. 使用低级 asyncio.Protocol。为此,我们有两个API:loop.create_serverloop.create_connection。前者用于创建可暴露于网络并接受客户端连接的服务器(例如HTTP服务器)。后者可用于实现客户端(例如API客户端,数据库驱动程序等)。

    的核心思想Protocol非常简单:建立连接后,它可以实现一个connection_made()方法,该方法由loop.create_server或由调用loop.create_connection。该协议将接收Transport对象的实例,该实例可用于将数据发送回客户端。

    它还可以实现data_received()方法,当有传入数据要处理时,事件循环将调用该方法。通用方法是为要实现的协议编写一个缓冲区抽象,该抽象可以解析数据。缓冲区中有足够的数据可解析和处理后,您可以将结果放入asyncio.Queue或安排一些异步任务。例如:

    class MyProtocolBuffer:
        """Primitive protocol parser"""
    
        def __init__(self):
            self.buf = b''  # for real code use bytearray
                            # or list of memoryviews
    
        def feed_data(data):
            self.buf += data
    
        def has_complete_message(self):
            # Implement your parsing logic here...
    
        def read_message(self):
            # ...and here.
    
    
    class MyProtocol:
        def __init__(self, loop, queue: asyncio.Queue):
            self.buffer = MyProtocolReadBuffer()
    
        def data_received(self, data):
            self.buffer.feed_data(data)
            while self.buffer.has_complete_message():
                message = self.buffer.read_message()
                queue.put_nowait(message)
    
    
    async def main(host, port):
        queue = asyncio.Queue()
    
        loop = asyncio.get_event_loop()       
        transport, protocol = await loop.create_connection(
            lambda: MyProtocol(loop, queue),
            host, port)
    
        try:
            while True:
                message = await queue.get()
                # This is where you implement your "high level api"
                # or even "client code".
                print(f'received message {message!r}')
        finally:
            transport.close()
    
    
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(host, port))
    finally:
        loop.close()
    
    Run Code Online (Sandbox Code Playgroud)

    请注意,这是一个低级的异步API。 它旨在供框架和库作者使用。例如,高性能asyncio PostgreSQL驱动程序asyncpg使用这些API。

    您可以在此处阅读有关协议的更多信息:https : //docs.python.org/3/library/asyncio-protocol.html#protocols。成功使用这些API的一个很好的例子是asyncpg库:https : //github.com/magicstack/asyncpg

  2. 使用高级异步流。流允许您使用异步/等待语法实现协议。两个主要的API:asyncio.open_connection()asyncio.start_server()。让我们使用来重新实现以上示例open_connection()

    async def main():
        reader, writer = await asyncio.open_connection(host, port)
    
        message_line = await reader.readline()
        # Implement the rest of protocol parser using async/await
        # and `reader.readline()`, `reader.readuntil()`, and
        # `reader.readexactly()` methods.
    
        # Once you have your message you can put in an asyncio.Queue,
        # or spawn asyncio tasks to process incoming messages.
    
        # This is where you implement your "high level api"
        # or even "client code".
    
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(host, port))
    finally:
        loop.close()        
    
    Run Code Online (Sandbox Code Playgroud)

我建议始终使用流起草一些协议的第一个实现,特别是如果您不熟悉网络和异步的话。您可以使用低级API来构建更快的代码,使用流中的错误可以使工作程序更快,并且代码库更易于维护。理想情况下,您应该只坚持使用异步/等待和高级异步API。