Mic*_*ael 16 python redis python-asyncio aiohttp
我正在玩aiohttp,看看它将如何作为带有websocket连接的移动应用程序的服务器应用程序.
这是一个简单的"Hello world"示例(这里是gist):
import asyncio
import aiohttp
from aiohttp import web
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(
handler,
'127.0.0.1',
8080,
)
srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
Run Code Online (Sandbox Code Playgroud)
现在我想使用下面描述的结构(node server = python aiohttp).更具体地说,使用带有asyncio-redis的Redis Pub/Sub机制来读取和写入我的WebsocketEchoHandler中的 websocket连接和Redis .
WebsocketEchoHandler是一个简单的循环,所以我不确定应该怎么做.使用Tornado和brükva我会使用回调.
由于我已经使用Redis,我应采取以下两种方法中的哪一种:
图片来自http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis
我似乎需要澄清一下.
Redis Pub/Sub处理程序可能如下所示:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
msg = yield from subscriber.next_published()
ws.send_str(msg.value + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
Run Code Online (Sandbox Code Playgroud)
该处理程序只订阅Redis通道ch1和ch2,并将从这些通道收到的每条消息发送到websocket.
我想要这个处理程序:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
# If message recived from redis OR from websocket
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
if msg_ws:
# push to redis / do something else
self.on_msg_from_ws(msg_ws)
if msg_redis:
self.on_msg_from_redis(msg_redis)
except:
pass
finally:
print('Connection closed')
return ws
Run Code Online (Sandbox Code Playgroud)
但是下面的代码总是按顺序调用,所以读取来自Redis的websocket块:
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
Run Code Online (Sandbox Code Playgroud)我希望在事件是从两个来源之一收到的消息的事件上完成阅读.
dan*_*ano 23
您应该使用两个while循环 - 一个用于处理来自websocket的消息,另一个用于处理来自redis的消息.你的主要处理程序可以刚刚打完假两个协同程序,一个处理每个循环,然后等待两个人:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
# Kick off both coroutines in parallel, and then block
# until both are completed.
yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
except Exception as e: # Don't do except: pass
import traceback
traceback.print_exc()
finally:
print('Connection closed')
return ws
@asyncio.coroutine
def handle_ws(self, ws):
while True:
msg_ws = yield from ws.receive()
if msg_ws:
self.on_msg_from_ws(msg_ws)
@asyncio.coroutine
def handle_redis(self, subscriber):
while True:
msg_redis = yield from subscriber.next_published()
if msg_redis:
self.on_msg_from_redis(msg_redis)
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您可以从两个潜在来源中的任何一个进行阅读,而无需关心另一个.