带有同步代码的 asyncio

Ser*_*hik 3 python python-3.x async-await python-asyncio

我有一个模块可以阻止对某些 TCP 服务器的网络请求并接收响应。我必须将它集成到 asyncio 应用程序中。我的模块看起来像这样:

import socket

# class providing transport facilities
# can be implemented in any manner
# for example it can manage asyncio connection
class CustomTransport:
    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.host = host
        self.port = port

    def write(self, data):
        left = len(data)
        while left:
            written = self.sock.send(data.encode('utf-8'))
            left = left - written
            data = data[written:]

    def read(self, sz):
        return self.sock.recv(sz)

    def open(self):
        self.sock.connect((self.host, self.port))

    def close(self):
        self.sock.shutdown(2)


# generated. shouldn't be modified
# however any transport can be passed
class HelloNetClient:
    def __init__(self, transport):
        self.transport = transport

    def say_hello_net(self):
        self.transport.write('hello')
        response = self.transport.read(5)
        return response


# can be modified
class HelloService:
    def __init__(self):
        # create transport for connection to echo TCP server
        self.transport = CustomTransport('127.0.0.1', 6789)
        self.hello_client = HelloNetClient(self.transport)

    def say_hello(self):
        print('Saying hello...')
        return self.hello_client.say_hello_net()

    def __enter__(self):
        self.transport.open()
        return self

    def __exit__(self,exc_type, exc_val, exc_tb):
        self.transport.close()
Run Code Online (Sandbox Code Playgroud)

用法:

def start_conversation():
    with HelloService() as hs:
        answer = hs.say_hello()
        print(answer.decode('utf-8'))


if __name__ == "__main__":
    start_conversation()
Run Code Online (Sandbox Code Playgroud)

现在我看到让我的模块与asyncio兼容的唯一方法是将所有内容转换为协程并用 asyncio 提供的传输替换常规套接字。但我不想接触生成的代码(HelloNetClient)。是否可以?

PS我希望它像这样使用:

async def start_conversation():
    async with HelloService() as hs:
        answer = await hs.say_hello()
        print(answer.decode('utf-8'))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start_conversation())
Run Code Online (Sandbox Code Playgroud)

use*_*342 5

HelloService可能需要使用run_in_executor(管理线程池)HelloNetClient在后台运行方法。例如:

async def say_hello(self):
    print('Saying hello...')
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(self.hello_client.say_hello_net)
Run Code Online (Sandbox Code Playgroud)

这不是 asyncio 的惯用用法,并且您错过了它的一些功能 - 例如,您将无法创建数千个并行工作的客户端,并且您将无法获得可靠的取消(能够cancel()完成您希望的任何任务)。尽管如此,简单的使用还是可以的。

不幸的是,提供自定义传输的能力在这里没有帮助,因为中间层HelloNetClient期望同步行为。即使您要编写一个挂接到 asyncio 的自定义传输,像say_hello_net这样的方法仍然会等待响应到达所需的时间,因此HelloService必须在单独的线程中安排它们。出于这个原因,您最好的选择是使用默认传输并将带有 asyncio 的代码与服务中的代码连接,如上所示。