如何为asyncio创建自定义传输?

wvx*_*xvw 4 python python-asyncio

我正在阅读包中的文档ProtocolTransportasyncio.特别:

子类化协议类时,建议您覆盖某些方法.这些方法是回调:它们将由传输在某些事件上调用(例如,当收到某些数据时); 你不应该自己打电话,除非你正在实施运输.

强调补充说

因此,原则上应该可以实现运输,但......

传输是asyncio提供的类,用于抽象各种通信信道.您通常不会自己实例化运输; 相反,您将调用一个AbstractEventLoop方法 (哪一个?),它将创建传输并尝试启动底层通信通道,并在成功时回拨您.

再次强调了重点

阅读关于AbstractEventLoop的部分,我没有看到任何创建自定义传输的方法.它最接近的AbstractEventLoop.create_connection(protocol_factory, host=...)只是暗示它会创造某种插座......

好吧,我的最终目标是使用一个自定义传输,它不是任何类型的套接字(也许StringIO是一个数据库连接游标,也许是另一种协议).


那么,这只是一个很好的但是从未在文档中实现过的错误,或者实际上是否有一种方法可以实现自定义传输而无需修补猴子asyncio并牺牲第一胎?

Vin*_*ent 6

需要一些样板代码,尽管它不是太多:

from asyncio import streams, transports, get_event_loop


class CustomTransport(transports.Transport):

    def __init__(self, loop, protocol, *args, **kwargs):
        self._loop = loop
        self._protocol = protocol

    def get_protocol(self):
        return self._protocol

    def set_protocol(self, protocol):
        return self._protocol

    # Implement read/write methods

    # [...]


async def custom_create_connection(protocol_factory, *args, **kwargs):
    loop = get_event_loop()
    protocol = protocol_factory()
    transport = CustomTransport(loop, protocol, *args, **kwargs)
    return transport, protocol


async def custom_open_connection(*args, **kwargs):
    reader = streams.StreamReader()
    protocol = streams.StreamReaderProtocol(reader)
    factory = lambda: protocol
    transport, _ = await custom_create_connection(factory, *args, **kwargs)
    writer = streams.StreamWriter(transport, protocol, reader)
    return reader, writer
Run Code Online (Sandbox Code Playgroud)