wea*_*rog 24 python websocket python-3.x python-asyncio
我正在使用该websockets库在Python 3.4中创建一个websocket服务器.这是一个简单的echo服务器:
import asyncio
import websockets
@asyncio.coroutine
def connection_handler(websocket, path):
while True:
msg = yield from websocket.recv()
if msg is None: # connection lost
break
yield from websocket.send(msg)
start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)
假设我们 - 另外 - 想要在某些事件发生时向客户端发送消息.为简单起见,让我们每隔60秒定期发送一条消息.我们怎么做?我的意思是,因为connection_handler经常等待收到的消息,服务器只能在收到客户端的消息后才采取行动,对吗?我在这里错过了什么?
也许这种情况需要一个基于事件/回调的框架,而不是一个基于协同程序的框架?龙卷风?
Yar*_*min 28
TL; DR用于asyncio.ensure_future()同时运行多个协同程序.
也许这种情况需要一个基于事件/回调的框架,而不是一个基于协同程序的框架?龙卷风?
不,你不需要任何其他框架.异步应用程序vs同步的整个想法是它在等待结果时不会阻塞.使用协同程序或回调并不重要.
我的意思是,因为connection_handler一直在等待传入消息,所以服务器只能在收到来自客户端的消息后才采取行动,对吗?我在这里错过了什么?
在同步应用程序中,您将编写类似的内容msg = websocket.recv(),这将阻止整个应用程序,直到您收到消息(如您所述).但在异步应用程序中它完全不同.
当你这样做时,msg = yield from websocket.recv()你会说:暂停执行connection_handler()直到websocket.recv()会产生一些东西.使用yield from内部协同程序将控制权返回给事件循环,因此可以执行其他一些代码,而我们正在等待结果websocket.recv().请参阅文档以更好地了解协程如何工作.
假设我们 - 另外 - 想要在某些事件发生时向客户端发送消息.为简单起见,让我们每隔60秒定期发送一条消息.我们怎么做?
asyncio.async()在执行阻塞调用以启动事件循环之前,您可以使用它来运行任意数量的协同程序.
import asyncio
import websockets
# here we'll store all active connections to use for sending periodic messages
connections = []
@asyncio.coroutine
def connection_handler(connection, path):
connections.append(connection) # add connection to pool
while True:
msg = yield from connection.recv()
if msg is None: # connection lost
connections.remove(connection) # remove connection from pool, when client disconnects
break
else:
print('< {}'.format(msg))
yield from connection.send(msg)
print('> {}'.format(msg))
@asyncio.coroutine
def send_periodically():
while True:
yield from asyncio.sleep(5) # switch to other code and continue execution in 5 seconds
for connection in connections:
print('> Periodic event happened.')
yield from connection.send('Periodic event happened.') # send message to each connected client
start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)
这是一个示例客户端实现.它要求您输入名称,从echo服务器接收它,等待来自服务器的两个消息(这是我们的定期消息)并关闭连接.
import asyncio
import websockets
@asyncio.coroutine
def hello():
connection = yield from websockets.connect('ws://localhost:8000/')
name = input("What's your name? ")
yield from connection.send(name)
print("> {}".format(name))
for _ in range(3):
msg = yield from connection.recv()
print("< {}".format(msg))
yield from connection.close()
asyncio.get_event_loop().run_until_complete(hello())
Run Code Online (Sandbox Code Playgroud)
重点:
asyncio.async()中重命名为asyncio.ensure_future().Cyr*_* N. 20
我很惊讶gather没有提到。
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
Run Code Online (Sandbox Code Playgroud)
同样的问题,直到我在这里看到完美的样品才能得到解决方案:http://websockets.readthedocs.io/en/stable/intro.html#both
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED) # Important
Run Code Online (Sandbox Code Playgroud)
所以,我可以处理多个协程任务,如心跳和redis订阅.