Jor*_* H. 8 python-3.x python-asyncio
我正在开发一个 Python-3 程序,它试图做两件事:(1)从外部 websocket(非阻塞)读取数据(类型 1)和(2)在常规 UDP 上接收数据(类型 2)套接字(非阻塞)
有很长一段时间,websocket 和 UDP 套接字上都没有数据。因此,我试图对两种数据类型进行非阻塞读取/接收。我正在尝试使用 Asyncio 和 Websockets 为 websocket 执行此操作。
不幸的是,只要 websocket 上没有数据(类型 1),下面的代码就会挂起。它阻止了其余代码的执行。我究竟做错了什么?
在此先感谢您的帮助。
import asyncio
import websockets
import socket
IP_STRATUX = "ws://192.168.86.201/traffic"
# Method to retrieve data (Type 1) from websocket
async def readWsStratux(inURL):
async with websockets.connect(inURL, close_timeout=0.1) as websocket:
try:
data = await websocket.recv()
return data
except websocket.error:
return None
if __name__ == "__main__":
# Socket to receive data (Type 2)
sockPCC = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sockPCC.bind((PCC_IP, PCC_PORT))
sockPCC.setblocking(0)
while True:
print('-----MAIN LOOP-----')
data1 = asyncio.get_event_loop().run_until_complete(
readWsStratux(IP_STRATUX))
print(f'Data 1: {data1}')
data2, addr = sockPCC.recvfrom(1024)
print(f'Data 2: {data2}')
Run Code Online (Sandbox Code Playgroud)
问题是按照run_until_complete它说的做,即运行提供的协程,直到它返回。您需要创建一个协程来生成两个独立的任务,每个任务在“后台”运行一个自己的协程。一个任务将处理从 websocket 读取,另一个任务是 UDP 数据。两个协程都可以提供一个队列,您的主协程从中读取。
websocket 协程看起来与您已有的非常相似,但将无限循环推入协程,并将数据传输到调用方提供的队列中:
async def readWsStratux(inURL, queue):
while True:
async with websockets.connect(inURL, close_timeout=0.1) as ws:
try:
data = await ws.recv()
await queue.put(('websocket', data))
except websockets.error:
return None
Run Code Online (Sandbox Code Playgroud)
接下来,您将需要一个类似的协程来执行 UDP。与其手动创建非阻塞套接字,不如使用 asyncio对 UDP的支持。您可以从文档中的示例类的简化版本开始:
class ClientProtocol:
def __init__(self, queue):
self.queue = queue
def datagram_received(self, data, addr):
self.queue.put_nowait(('udp', data))
def connection_lost(self, exc):
self.queue.put_nowait(('udp', b''))
async def read_udp(queue):
transport, protocol = await loop.create_datagram_endpoint(
lambda: ClientProtocol(queue),
remote_addr=(PCC_IP, PCC_PORT))
# wait until canceled
try:
await asyncio.get_event_loop().create_future()
except asyncio.CancelledError:
transport.close()
raise
Run Code Online (Sandbox Code Playgroud)
有了这两个,您就可以编写主协程来生成它们并在它们运行时从队列中收集数据:
async def read_both(in_url):
queue = asyncio.Queue()
# spawn two workers in parallel, and have them send
# data to our queue
ws_task = asyncio.create_task(readWsStratux(in_url, queue))
udp_task = asyncio.create_task(read_udp(queue))
while True:
source, data = await queue.get()
if source == 'ws':
print('from websocket', data)
elif source == 'udp':
if data == b'':
break # lost the UDP connection
print('from UDP', data)
# terminate the workers
ws_task.cancel()
udp_task.cancel()
Run Code Online (Sandbox Code Playgroud)
您的主程序现在包含一个简单的调用read_both:
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(read_both(IP_STRATUX))
Run Code Online (Sandbox Code Playgroud)
请注意,上述代码未经测试,可能包含拼写错误。
| 归档时间: |
|
| 查看次数: |
2943 次 |
| 最近记录: |