Python Asyncio & Websocket 如何避免全局变量?

cat*_*cap 4 python multithreading websocket python-asyncio

我有一个使用 asyncio 和 websockets 的 python 套接字服务器。当 websocket 处于活动状态时,100 多个设备将连接并保持连接等待命令/消息。

有两个线程,第一个线程接受连接并将其详细信息添加到全局变量中,然后等待来自设备的消息:

async def thread1(websocket, path):
    client_address = await websocket.recv()
    CONNECTIONS[client_address] = websocket
    async for message in websocket:
        ... do something with message

start_server = websockets.serve(thread1, host, port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.ensure_future(thread2())
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

第二个线程处理一些用户数据,一旦需要发送命令,它就会访问全局变量以获取 websocket 信息:

thread2()
    ...some data processing 
    soc = CONNECTIONS[ipaddress] 
    await soc.send("some message")
Run Code Online (Sandbox Code Playgroud)

我的问题:允许另一个线程发送消息的最佳方式是什么?

我可以使用线程锁定和仅用于处理该数据的函数来保留全局变量safe,但是全局变量并不理想。我无法在线程之间发送信息,因为 thread1 正在stuck等待接收消息。

ale*_*ame 6

我想说的第一件事是线程一词的错误使用。您使用asyncio这里使用的概念 -协程(协程被包装到异步任务中)。例如,可以在此处找到它与线程的不同之处。

服务器websockets为每个传入连接生成一个新任务(有相同数量的连接和生成的任务)。我没有看到全局对象有什么问题,至少在一个小脚本中是这样。然而,下面我给出了一个例子,我把它放在一个单独的类中。

此外,在这种情况下,协程之间不需要特殊的同步,因为它们是通过协作多任务实现的(事实上,所有协程都在一个线程中执行,在某些点转移控制权。)

这是一个简单的示例,其中服务器存储传入连接的字典,并每 2 秒启动一个任务,通知所有客户端并向它们发送当前时间。服务器还将客户端的确认信息打印到控制台。

# ws_server.py
import asyncio
import websockets
import datetime


class Server:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connections = {}
        self.is_active = False
        self.server = None

    async def start(self):
        self.is_active = True
        self.server = await websockets.serve(self.handler, self.host, self.port)
        asyncio.create_task(self.periodic_notifier())

    async def stop(self):
        self.is_active = False
        self.server.close()
        await self.wait_closed()

    async def wait_closed(self):
        await self.server.wait_closed()

    async def handler(self, websocket, path):
        self.connections[websocket.remote_address] = websocket
        try:
            async for message in websocket:
                print(message)
        except ConnectionClosedError as e:
            pass
        del self.connections[websocket.remote_address]
        print(f"Connection {websocket.remote_address} is closed")

    async def periodic_notifier(self):
        while self.is_active:
            await asyncio.gather(
                *[ws.send(f"Hello time {datetime.datetime.now()}") for ws in self.connections.values()],
                return_exceptions=True)
            await asyncio.sleep(2)


async def main():
    server = Server("localhost", 8080)
    await server.start()
    await server.wait_closed()

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
# ws_client.py
import asyncio
import websockets


async def client():
    uri = "ws://localhost:8080"
    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            print(message)
            await websocket.send(f"ACK {message}")


asyncio.run(client())

Run Code Online (Sandbox Code Playgroud)