cat*_*cap 4 python multithreading websocket python-asyncio
我有一个使用 asyncio 和 websockets 的 python 套接字服务器。当 websocket 处于活动状态时,100 多个设备将连接并保持连接等待命令/消息。
有两个线程,第一个线程接受连接并将其详细信息添加到全局变量中,然后等待来自设备的消息:
async def thread1(websocket, path):
client_address = await websocket.recv()
CONNECTIONS[client_address] = websocket
async for message in websocket:
... do something with message
start_server = websockets.serve(thread1, host, port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.ensure_future(thread2())
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)
第二个线程处理一些用户数据,一旦需要发送命令,它就会访问全局变量以获取 websocket 信息:
thread2()
...some data processing
soc = CONNECTIONS[ipaddress]
await soc.send("some message")
Run Code Online (Sandbox Code Playgroud)
我的问题:允许另一个线程发送消息的最佳方式是什么?
我可以使用线程锁定和仅用于处理该数据的函数来保留全局变量safe,但是全局变量并不理想。我无法在线程之间发送信息,因为 thread1 正在stuck等待接收消息。
我想说的第一件事是线程一词的错误使用。您使用asyncio这里使用的概念 -协程(协程被包装到异步任务中)。例如,可以在此处找到它与线程的不同之处。
服务器websockets为每个传入连接生成一个新任务(有相同数量的连接和生成的任务)。我没有看到全局对象有什么问题,至少在一个小脚本中是这样。然而,下面我给出了一个例子,我把它放在一个单独的类中。
此外,在这种情况下,协程之间不需要特殊的同步,因为它们是通过协作多任务实现的(事实上,所有协程都在一个线程中执行,在某些点转移控制权。)
这是一个简单的示例,其中服务器存储传入连接的字典,并每 2 秒启动一个任务,通知所有客户端并向它们发送当前时间。服务器还将客户端的确认信息打印到控制台。
# ws_server.py
import asyncio
import websockets
import datetime
class Server:
def __init__(self, host, port):
self.host = host
self.port = port
self.connections = {}
self.is_active = False
self.server = None
async def start(self):
self.is_active = True
self.server = await websockets.serve(self.handler, self.host, self.port)
asyncio.create_task(self.periodic_notifier())
async def stop(self):
self.is_active = False
self.server.close()
await self.wait_closed()
async def wait_closed(self):
await self.server.wait_closed()
async def handler(self, websocket, path):
self.connections[websocket.remote_address] = websocket
try:
async for message in websocket:
print(message)
except ConnectionClosedError as e:
pass
del self.connections[websocket.remote_address]
print(f"Connection {websocket.remote_address} is closed")
async def periodic_notifier(self):
while self.is_active:
await asyncio.gather(
*[ws.send(f"Hello time {datetime.datetime.now()}") for ws in self.connections.values()],
return_exceptions=True)
await asyncio.sleep(2)
async def main():
server = Server("localhost", 8080)
await server.start()
await server.wait_closed()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
# ws_client.py
import asyncio
import websockets
async def client():
uri = "ws://localhost:8080"
async with websockets.connect(uri) as websocket:
async for message in websocket:
print(message)
await websocket.send(f"ACK {message}")
asyncio.run(client())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
837 次 |
| 最近记录: |