KDM*_*KDM 5 python python-3.x python-asyncio
我在Python 3.4中使用Asyncio,我将尝试解释我在这一点上正在做的事情以及导致问题的原因。
一方面,我有一个带阻塞操作的UDP连接框架,我要从该流中获取数据并创建以SSE格式传递给客户端的json。这一切都很好。
我遇到的问题是,如果我什么也不做,则无法正确处理客户端断开连接,而客户端断开连接,我将开始遇到此错误:
WARNING [selector_events:613] socket.send() raised exception.
Run Code Online (Sandbox Code Playgroud)
由于循环仍在运行,因此我一直在研究彻底打破循环并触发.close()的方法,但是我遇到的示例问题令我无法解决,而且在线资源也不多。
似乎实际可行的一个示例是尝试从客户端读取一行,并且如果该字符串为空字符串,则表示客户端已断开连接。
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
Run Code Online (Sandbox Code Playgroud)
但是,在大约十条消息之后,所有发送给客户端的消息都停止了,我认为这是因为如果我关闭了客户端,则挂起后,它挂在了“数据=(来自client_reader.readline()的产量)”上,那么它确实关闭了,并且“结束连接”确实被调用。任何想法为什么它可能会挂起?我认为目前我对Asyncio有一个很好的处理,但是这个让我感到困惑。
注意:location()和status()是我从UDP套接字获取信息的两个调用-我已经使用相同的代码成功运行了多个小时而没有出现问题-减去了客户端断开连接线。
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
yield from postmessage(data,client_writer)
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
data = yield from asyncio.wait_for(location(),
timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(status(),
timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
Run Code Online (Sandbox Code Playgroud)
更新:如果我在“来自client_reader的收益”上添加了超时,当它到达通常会挂起的地步时,我会收到以下错误。
2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = next(coro)
File "try.py", line 35, in handle_client
timeout=1.0))
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
Run Code Online (Sandbox Code Playgroud)
这是一个示例脚本,显示了实际的错误-只需在python 3.4.2中运行它,经过9次迭代后,它将暂停从客户端读取内容。
(脚本已完成,因此您可以运行它以亲自查看)
import asyncio
import logging
import json
import time
log = logging.getLogger(__name__)
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
if not data: #client disconnected
break
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
@asyncio.coroutine
def test1():
data = {'result':{
'test1':{ }
}
}
data = json.dumps(data)
return data
@asyncio.coroutine
def test2():
data = {'result':{
'test2':{ }
}
}
data = json.dumps(data)
return data
def main():
loop = asyncio.get_event_loop()
f = asyncio.start_server(accept_client, host=None, port=2991)
loop.run_until_complete(f)
loop.run_forever()
if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " +
"[%(module)s:%(lineno)d] %(message)s")
# log the things
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
log.addHandler(ch)
main()
Run Code Online (Sandbox Code Playgroud)
另一个更新:我发现它死了,因为它从客户端的标头读取了所有行,然后在行数用完时超时。我认为,我正在寻找的真正答案是当您实际上不需要从客户端(初始连接之外)接收数据时如何检测客户端断开连接。
好的,我想我理解您的问题。您正在从客户端读取信息,只是要了解客户端是否已断开连接,但是一旦客户端发送了其标头,readline()客户端仍处于连接状态时,它将无限期地阻塞,这将阻止您实际进行任何工作。使用超时来避免阻塞是可以的,您只需要处理TimeoutError,因为发生这种情况时,您可以假设客户端没有断开连接:
from concurrent.futures import TimeoutError
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
try:
# See if client has disconnected.
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01))
if not data: # Client disconnected
break
except TimeoutError:
pass # Client hasn't disconnected.
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
Run Code Online (Sandbox Code Playgroud)
请注意,我在这里使超时非常短,因为我们真的根本不想阻塞,我们只想知道连接是否已关闭。
但是,一个更好的解决办法是没有明确检查,看看是否连接已经关闭,而是处理你得到什么异常,当你试图通过套接字发送数据时,连接已被关闭:
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
try:
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
except ConnectionResetError: # And/or whatever other exceptions you see.
break
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3691 次 |
| 最近记录: |