wvx*_*xvw 4 python python-asyncio
我正在阅读包中的文档Protocol和Transport类asyncio.特别:
子类化协议类时,建议您覆盖某些方法.这些方法是回调:它们将由传输在某些事件上调用(例如,当收到某些数据时); 你不应该自己打电话,除非你正在实施运输.
强调补充说
因此,原则上应该可以实现运输,但......
传输是asyncio提供的类,用于抽象各种通信信道.您通常不会自己实例化运输; 相反,您将调用一个AbstractEventLoop方法 (哪一个?),它将创建传输并尝试启动底层通信通道,并在成功时回拨您.
再次强调了重点
阅读关于AbstractEventLoop的部分,我没有看到任何创建自定义传输的方法.它最接近的AbstractEventLoop.create_connection(protocol_factory, host=...)只是暗示它会创造某种插座......
好吧,我的最终目标是使用一个自定义传输,它不是任何类型的套接字(也许StringIO是一个数据库连接游标,也许是另一种协议).
那么,这只是一个很好的但是从未在文档中实现过的错误,或者实际上是否有一种方法可以实现自定义传输而无需修补猴子asyncio并牺牲第一胎?
需要一些样板代码,尽管它不是太多:
from asyncio import streams, transports, get_event_loop
class CustomTransport(transports.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
async def custom_create_connection(protocol_factory, *args, **kwargs):
loop = get_event_loop()
protocol = protocol_factory()
transport = CustomTransport(loop, protocol, *args, **kwargs)
return transport, protocol
async def custom_open_connection(*args, **kwargs):
reader = streams.StreamReader()
protocol = streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await custom_create_connection(factory, *args, **kwargs)
writer = streams.StreamWriter(transport, protocol, reader)
return reader, writer
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
728 次 |
| 最近记录: |