# server.py
from typing import Any
import socketio
import uvicorn
from fastapi import FastAPI
sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()
@app.get("/test")
async def test():
print("test")
return "WORKS"
app.mount("/", socket_app) # Here we mount socket app to main fastapi app
@sio.on("connect")
async def connect(sid, env):
print("on connect")
@sio.on("direct")
async def direct(sid, msg):
print(f"direct {msg}")
# we can send message to specific sid
await sio.emit("event_name", msg, room=sid)
@sio.on("broadcast")
async def broadcast(sid, msg):
print(f"broadcast {msg}")
await sio.emit("event_name", msg) # or send to everyone
@sio.on("disconnect")
async def disconnect(sid):
print("on disconnect")
if __name__ == "__main__":
kwargs = {"host": "0.0.0.0", "port": 5000}
kwargs.update({"debug": True, "reload": True})
uvicorn.run("server:app", **kwargs)
Run Code Online (Sandbox Code Playgroud)
# client.py
import requests
import socketio
r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()
@cl.on("event_name")
def foo(data):
print(f"client 1 {data}")
@cl2.on("event_name")
def foo2(data):
print(f"client 2 {data}")
cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2
Run Code Online (Sandbox Code Playgroud)
最后安装适当的依赖项:
# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5374 次 |
| 最近记录: |