Bur*_*ede 8 python tornado publish-subscribe redis
我正在使用Redis以及我的Tornado应用程序与asyc客户端Brukva,当我查看Brukva站点上的示例应用程序时,他们正在websocket中的" init "方法上建立新连接
class MessagesCatcher(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
super(MessagesCatcher, self).__init__(*args, **kwargs)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe('test_channel')
def open(self):
self.client.listen(self.on_message)
def on_message(self, result):
self.write_message(str(result.body))
def close(self):
self.client.unsubscribe('test_channel')
self.client.disconnect()
Run Code Online (Sandbox Code Playgroud)
在websocket的情况下很好但是如何在常见的Tornado RequestHandler post方法中处理它说长轮询操作(发布 - 订阅模型).我在更新处理程序的每个post方法中创建新的客户端连接这是正确的方法吗?当我在redis控制台上检查时,我看到每个新的post操作都会增加客户端.

这是我的代码示例.
c = brukva.Client(host = '127.0.0.1')
c.connect()
class MessageNewHandler(BaseHandler):
@tornado.web.authenticated
def post(self):
self.listing_id = self.get_argument("listing_id")
message = {
"id": str(uuid.uuid4()),
"from": str(self.get_secure_cookie("username")),
"body": str(self.get_argument("body")),
}
message["html"] = self.render_string("message.html", message=message)
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
c.publish(self.listing_id, message)
logging.info("Writing message : " + json.dumps(message))
self.write(json.dumps(message))
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
self.listing_id = self.get_argument("listing_id", None)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe(self.listing_id)
self.client.listen(self.on_new_messages)
def on_new_messages(self, messages):
# Closed client connection
if self.request.connection.stream.closed():
return
logging.info("Getting update : " + json.dumps(messages.body))
self.finish(json.dumps(messages.body))
self.client.unsubscribe(self.listing_id)
def on_connection_close(self):
# unsubscribe user from channel
self.client.unsubscribe(self.listing_id)
self.client.disconnect()
Run Code Online (Sandbox Code Playgroud)
如果您提供类似案例的示例代码,我将不胜感激.
Ana*_*nth 11
有点晚了,但我一直在使用龙卷风.它适用于龙卷风的ioloop和tornado.gen模块
安装tornadoredis
它可以从pip安装
pip install tornadoredis
Run Code Online (Sandbox Code Playgroud)
或者使用setuptools
easy_install tornadoredis
Run Code Online (Sandbox Code Playgroud)
但你真的不应该这样做.您还可以克隆存储库并将其解压缩.然后跑
python setup.py build
python setup.py install
Run Code Online (Sandbox Code Playgroud)
连接到redis
以下代码包含在main.py或同等代码中
redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()
Run Code Online (Sandbox Code Playgroud)
redis.connect只调用一次.这是一个阻塞调用,所以应该在启动主ioloop之前调用它.所有处理程序之间共享相同的连接对象.
您可以将其添加到您的应用程序设置中
settings = {
redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
**settings)
Run Code Online (Sandbox Code Playgroud)
使用tornadoredis
连接可以在处理程序中使用,self.settings['redis']也可以作为BaseHandler类的属性添加.您的请求处理程序将该类子类化并访问该属性.
class BaseHandler(tornado.web.RequestHandler):
@property
def redis():
return self.settings['redis']
Run Code Online (Sandbox Code Playgroud)
要与redis通信,使用tornado.web.asynchronous和tornado.gen.engine装饰器
class SomeHandler(BaseHandler):
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
foo = yield gen.Task(self.redis.get, 'foo')
self.render('sometemplate.html', {'foo': foo}
Run Code Online (Sandbox Code Playgroud)
额外的信息
可以在github repo上找到更多示例和其他功能,如连接池和管道.
您应该在应用程序中汇集连接。因为 brukva 似乎不自动支持这一点(redis-py 支持这一点,但本质上是阻塞的,所以它与龙卷风不兼容),所以您需要编写自己的连接池。
不过,模式非常简单。类似的东西(这不是真正的操作代码):
class BrukvaPool():
__conns = {}
def get(host, port,db):
''' Get a client for host, port, db '''
key = "%s:%s:%s" % (host, port, db)
conns = self.__conns.get(key, [])
if conns:
ret = conns.pop()
return ret
else:
## Init brukva client here and connect it
def release(client):
''' release a client at the end of a request '''
key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
self.__conns.setdefault(key, []).append(client)
Run Code Online (Sandbox Code Playgroud)
这可能有点棘手,但这就是主要思想。