Tornado + aioredis:为什么我的 redis 调用阻塞?

Ein*_*din 2 tornado redis python-3.x async-await python-asyncio

我尝试在 Tornado 和 Redis 上构建一个具有两个 API 端点的简单系统:

  1. 从 Redis 读取值的 API,或等待该值存在(使用BRPOP: value = yield from redis.brpop("test")
  2. 写入此值的 API(带有LPUSH: redis.lpush("test", "the value"))。

所以我希望能够以任何顺序调用这些 API。事实上,如果我调用 2. 然后 1.,它会按预期工作,对 1. 的调用立即返回值。

问题是,如果我调用 1. 然后 2.,两个请求都会阻塞,永远不会返回。

同时,当请求阻塞时,我仍然可以LPUSH/BRPOP直接在 Redis 中,甚至可以使用相同的键。同样,我可以在 Tornado 中调用其他处理程序。所以我猜这个块既不在 Redis 也不在 Tornado 中,而是在我对 aioredis 的使用中?也许是异步循环?但我不明白我错在哪里。任何提示?

谢谢你的帮助。

这是我的代码:

import tornado.ioloop
import tornado.web
from tornado import web, gen
from tornado.options import options, define
import aioredis
import asyncio


class WaitValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        value = yield from redis.brpop("test")
        self.write("I received a value: %s" % value)


class WriteValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        res = yield from redis.lpush("test", "here is the value")
        self.write("Ok ")


class Application(tornado.web.Application):
    def __init__(self):
        tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOMainLoop')

        handlers = [
            (r"/get", WaitValueHandler),
            (r"/put", WriteValueHandler)
        ]

        super().__init__(handlers, debug=True)

    def init_with_loop(self, loop):
        self.redis = loop.run_until_complete(
            aioredis.create_redis(('localhost', 6379), loop=loop)
        )

if __name__ == "__main__":
    application = Application()
    application.listen(8888)

    loop = asyncio.get_event_loop()
    application.init_with_loop(loop)
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

Ein*_*din 5

好的,我明白了原因,正如文档所述

共享模式下的阻塞操作(如 blpop、brpop 或长时间运行的 LUA 脚本)会阻塞连接,从而可能导致整个程序故障。

通过对此类操作使用独占连接,可以轻松解决此阻塞问题:

redis = await aioredis.create_redis_pool(
    ('localhost', 6379),
    minsize=1,
    maxsize=1)

async def task():
   # Exclusive mode
   with await redis as r:
       await r.set('key', 'val')
asyncio.ensure_future(task())
asyncio.ensure_future(task())
# Both tasks will first acquire connection.
Run Code Online (Sandbox Code Playgroud)