Lia*_*con 4 client-server python-3.x python-asyncio microservices server
我很难理解更改后的回显服务器的行为,该服务器试图利用python 3的asyncio模块。
本质上,我有一个无限循环(可以说,在建立连接后,我想无限地将一些数据从服务器流式传输到客户端),例如MyServer.py:
#! /usr/bin/python3
import asyncio
import os
import time
class MyProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def connection_lost(self, exc):
asyncio.get_event_loop().stop()
def data_received(self, data):
i = 0
while True:
self.transport.write(b'>> %i' %i)
time.sleep(2)
i+=1
loop = asyncio.get_event_loop()
coro = loop.create_server(MyProtocol,
os.environ.get('MY_SERVICE_ADDRESS', 'localhost'),
os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except:
loop.run_until_complete(server.wait_closed())
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud)
接下来,当我连接nc ::1 8100并发送一些文本(例如“测试”)时,我得到以下信息:
user@machine$ nc ::1 8100
*** Connection from('::1', 58503, 0, 0) ***
testing
>> 1
>> 2
>> 3
^C
Run Code Online (Sandbox Code Playgroud)
现在,当我尝试nc再次使用进行连接时,没有收到任何欢迎消息,并且在尝试向服务器发送一些新文本后,我得到了以下错误的无尽信息:
user@machine$ nc ::1 8100
Is there anybody out there?
socket.send() raised exception
socket.send() raised exception
...
^C
Run Code Online (Sandbox Code Playgroud)
只是给伤口加盐,socket.send() raised exception消息继续向我的终端发送垃圾邮件,直到我杀死python服务器进程为止。
由于我是Web技术的新手(成为台式恐龙的时间太长了!),我不确定为什么会出现上述行为,也不清楚如何产生预期的行为,这很松散看起来像这样:
任何启发都将受到欢迎!
代码有多个问题。
首先,data_received 永远不会回来。在传输/协议级别,异步编程是单线程且基于回调。应用程序代码分散在各种回调中,例如data_received,事件循环运行show,监视文件描述符并根据需要调用回调。每个回调只允许执行简短的计算,调用传输方法并安排进一步的回调执行。回调不能做的是花费大量时间来完成或阻止等待。while永不退出的循环特别糟糕,因为它根本不允许事件循环运行。
这就是为什么代码仅在客户端断开连接后才会吐出异常的原因:connection_lost永远不会调用。它应该由事件循环调用,永不返回data_received不会使事件循环有恢复的机会。在事件循环被阻止的情况下,该程序无法响应其他客户端,并data_received继续尝试将数据发送到断开连接的客户端,并记录失败的情况。
表达想法的正确方法如下所示:
def data_received(self, data):
self.i = 0
loop.call_soon(self.write_to_client)
def write_to_client(self):
self.transport.write(b'>> %i' % self.i)
self.i += 1
loop.call_later(2, self.write_to_client)
Run Code Online (Sandbox Code Playgroud)
请注意两者是如何进行的,data_received并且write_to_client只需做很少的工作即可快速返回。没有调用time.sleep(),也绝对没有无限循环-对的递归调用中隐藏了“循环” write_to_client。
此更改揭示了代码中的第二个问题。它MyProtocol.connection_lost停止整个事件循环并退出程序。这使程序无法响应第二个客户端。解决方法是替换loop.stop()为在connection_lost以下位置设置标志:
def data_received(self, data):
self._done = False
self.i = 0
loop.call_soon(self.write_to_client)
def write_to_client(self):
if self._done:
return
self.transport.write(b'>> %i' % self.i)
self.i += 1
loop.call_later(2, self.write_to_client)
def connection_lost(self, exc):
self._done = True
Run Code Online (Sandbox Code Playgroud)
这允许多个客户端连接。
协程允许编写看起来很自然的代码,其中包含循环,并且看起来好像包含阻塞调用,这些代码在后台被转换为暂停点,使事件循环得以恢复。使用流,来自问题的代码如下所示:
async def talk_to_client(reader, writer):
peername = writer.get_extra_info('peername')
print('Connection from {}'.format(peername))
data = await reader.read(1024)
i = 0
while True:
writer.write(b'>> %i' % i)
await writer.drain()
await asyncio.sleep(2)
i += 1
loop = asyncio.get_event_loop()
coro = asyncio.start_server(talk_to_client,
os.environ.get('MY_SERVICE_ADDRESS', 'localhost'),
os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
talk_to_client看起来非常类似于的原始实现data_received,但没有缺点。await如果数据不可用,则会在事件使用的每个点恢复事件循环。time.sleep(n)被替换await asyncio.sleep(n)为loop.call_later(n, <resume current coroutine>)。等待将writer.drain()确保协程在对等方无法处理其获取的输出时暂停,并确保在对等方断开连接时引发异常。