Python3 asyncio - 在自己的线程中运行两个服务器

Ser*_*zzo 2 python python-multithreading python-asyncio python-3.5

我的服务器类继承自BaseServer

class BaseServer(object):

    def __init__(self, host, port):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)

    async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
        pass

    def start(self):
        # wrapping coroutine into ensure_future to allow it to call from call_soon
        # wrapping into lambda to make it callable
        callback = asyncio.ensure_future(self.instance)
        self.loop.call_soon(lambda: callback)
        self.loop.run_forever()
        self.loop.close()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    @staticmethod
    def get_instance():
        return BaseServer(None, None)
Run Code Online (Sandbox Code Playgroud)

我需要两个在自己的线程中运行的服务器来并行处理请求。但是当我尝试根据需要运行它们时,只有第一台服务器正在运行。下面是我如何运行它们:

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def run():
        pool = ThreadPoolExecutor(max_workers=cpu_count())

        await loop.run_in_executor(pool, Server1.get_instance().start)
        await loop.run_in_executor(pool, Server2.get_instance().start)

    loop.run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)
  1. 我究竟做错了什么?如何在自己的线程中运行每个服务器?
  2. 当我asyncio.set_event_loop打电话时def __init__遇到下一个错误:

RuntimeError:线程“Thread-1”中没有当前事件循环。

但如果我将其删除asyncio.set_event_loop并将def __init__其移至def start错误就会消失。为什么会发生这样的事?

Vin*_*ent 5

跟进OP的评论:

但是在同一个循环中通过 run_until_complete 运行两个服务器会阻止其中一个服务器处理对另一个服务器的请求的循环,不是吗?如何正确运行两台服务器?

以下是来自 python 3.5 的 asyncio 文档的TCP 服务器示例的修改版本:

# Start server 1
coro1 = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server1 = loop.run_until_complete(coro1)
print('Serving 1 on {}'.format(server1.sockets[0].getsockname()))

# Start server 2
coro2 = asyncio.start_server(handle_echo, '127.0.0.1', 8889, loop=loop)
server2 = loop.run_until_complete(coro2)
print('Serving 2 on {}'.format(server2.sockets[0].getsockname()))

# Serve requests until Ctrl+C is pressed
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the servers
server1.close()
loop.run_until_complete(server1.wait_closed())
server2.close()
loop.run_until_complete(server2.wait_closed())

# Close the loop
loop.close()
Run Code Online (Sandbox Code Playgroud)

请注意,随着 python 3.7 添加 asyncio,它看起来会好得多

async def main():
    server1 = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr1 = server1.sockets[0].getsockname()
    print(f'Serving 1 on {addr1}')

    server2 = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8889)

    addr2 = server2.sockets[0].getsockname()
    print(f'Serving 2 on {addr2}')

    async with server1, server2:
        await asyncio.gather(
            server1.serve_forever(), server2.serve_forever())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

  • `gather` 捕获了等待直到两台服务器停止服务的*意图*,而 `await server1.serve_forever()` 则让我们看起来像是在等待 `server1` 并且不关心 `server2`。另一种选择是将“asyncio.wait”与“FIRST_COMPLETED”一起使用并取消另一个选项,但这需要输入太多内容。实际上,这些差异很小,因为预计两台服务器都不会单独停止服务。 (2认同)