Python 中的 Websockets 服务器推送

bli*_*web 1 python websocket

如何用 Python 编写一个 websockets 服务器,它只按时间间隔向所有连接的客户端推送数据,而不等待任何传入消息?

bli*_*web 6

我正在回答我自己的问题...

这是一个 Python websockets 服务器的工作示例,它每 5 秒向所有客户端发送一条消息。我写了这篇文章并设法让它工作,因为我在网上找不到这样的示例(2021 年 3 月)

希望这对其他人有帮助,如果有人对使用软件包添加 ssl 支持或订阅类型服务添加的改进或更好的解决方案有建议,请写在评论或答案部分。

import asyncio
import logging
import websockets
from websockets import WebSocketServerProtocol
import time
import threading

logging.basicConfig(level=logging.INFO)


class Server:

    clients = set()
    logging.info(f'starting up ...')

    def __init__(self):
        logging.info(f'init happened ...')
        
    async def register(self, ws: WebSocketServerProtocol) -> None:
        self.clients.add(ws)
        logging.info(f'{ws.remote_address} connects')

    async def unregister(self, ws: WebSocketServerProtocol) -> None:
        self.clients.remove(ws)
        logging.info(f'{ws.remote_address} disconnects')

    async def send_to_clients(self, message: str) -> None:
        if self.clients:
            logging.info("trying to send")
            await asyncio.wait([client.send(message) for client in self.clients])

    async def ws_handler(self, ws: WebSocketServerProtocol, url: str) -> None:
        await self.register(ws)
        try:
            await self.distribute(ws)
        finally:
            await self.unregister(ws)

    async def distribute(self, ws: WebSocketServerProtocol) -> None:
        async for message in ws:
            await self.send_to_clients(message)


async def timerThread(server,counter):
    counter = 0
    while True:
        await checkAndSend(server,counter)
        print("doing " + str(counter))
        time.sleep(5)
        counter = counter + 1

async def checkAndSend(server,counter):
    # check something
    # send message
    logging.info("in check and send")
    await server.send_to_clients("Hi there: " + str(counter))

# helper routine to allow thread to call async function
def between_callback(server,counter):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(timerThread(server,counter))
    loop.close()

# start server
server = Server()
start_server = websockets.serve(server.ws_handler,'localhost',4000)
counter = 0 

# start timer thread
threading.Thread(target=between_callback,args=(server,counter,)).start()

# start main event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

要查看此操作,您可以使用这个简单的 html 文件作为客户端,然后打开检查器以查看控制台日志上的传入消息。

<h1> Websocket Test </h1>
<script>
const ws = new WebSocket('ws://localhost:4000')
ws.onopen = () => {
  console.log('ws opened on browser')
  ws.send('hello world')
}
ws.onmessage = (message) => {
  console.log(`message received`, message.data)
}
</script>
Run Code Online (Sandbox Code Playgroud)

如果您使用的是 python 3.11,则应像这样重写以下函数:

async def send_to_clients(self, message: str) -> None:
    if self.clients:
        logging.info("trying to send")
        [await client.send(message) for client in self.clients]
Run Code Online (Sandbox Code Playgroud)