lim*_*boy 26 python redis redis-py
我想使用redis'pubsub来传输一些消息,但不希望被阻止使用listen,如下面的代码:
import redis
rc = redis.Redis()
ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])
rc.publish('foo', 'hello world')
for item in ps.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
Run Code Online (Sandbox Code Playgroud)
最后一for节将阻止.我只是想检查一个给定的通道是否有数据,我该如何做到这一点?有check类似的方法吗?
var*_*tec 43
如果您正在考虑非阻塞,异步处理,那么您可能正在使用(或应该使用)异步框架/服务器.
如果你正在使用Tornado,那就是Tornado-Redis.它使用本机Tornado生成器调用.它的Websocket演示提供了如何将它与pub/sub结合使用的示例.
似乎你可以使用Redis-py与Gevent结合使用Gevent的猴子补丁(gevent.monkey.patch_all()).
更新:自原始答案以来已经过去了5年,同时Python获得了本机异步IO支持.现在有AIORedis,一个异步IO Redis客户端.
dal*_*ore 15
接受的答案已经过时,因为redis-py建议您使用非阻塞get_message().但它也提供了一种轻松使用线程的方法.
https://pypi.python.org/pypi/redis
阅读消息有三种不同的策略.
在幕后,get_message()使用系统的"select"模块快速轮询连接的套接字.如果有可供读取的数据,get_message()将读取它,格式化消息并将其返回或传递给消息处理程序.如果没有要读取的数据,get_message()将立即返回None.这使得集成到应用程序内的现有事件循环中变得微不足道.
while True:
message = p.get_message()
if message:
# do something with the message
time.sleep(0.001) # be nice to the system :)
Run Code Online (Sandbox Code Playgroud)
较早版本的redis-py只能使用pubsub.listen()读取消息.listen()是一个阻塞直到消息可用的生成器.如果您的应用程序不需要执行任何其他操作,只需接收和处理从redis收到的消息,listen()是一种简单的方法来启动运行.
for message in p.listen():
# do something with the message
Run Code Online (Sandbox Code Playgroud)
第三个选项在单独的线程中运行事件循环.pubsub.run_in_thread()创建一个新线程并启动事件循环.线程对象返回给run_in_thread()的调用者.调用者可以使用thread.stop()方法来关闭事件循环和线程.在幕后,这只是一个围绕get_message()的包装器,它在一个单独的线程中运行,实际上为你创建了一个微小的非阻塞事件循环.run_in_thread()接受一个可选的sleep_time参数.如果指定,则事件循环将使用循环的每次迭代中的值调用time.sleep().
注意:由于我们在单独的线程中运行,因此无法处理未使用已注册的消息处理程序自动处理的消息.因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py会阻止您调用run_in_thread().
p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()
Run Code Online (Sandbox Code Playgroud)
所以要回答你的问题,只需要在知道邮件是否到达时检查get_message.
小智 13
新版本的redis-py支持异步pubsub,有关详细信息,请查看https://github.com/andymccurdy/redis-py.这是文档本身的一个例子:
while True:
message = p.get_message()
if message:
# do something with the message
time.sleep(0.001) # be nice to the system :)
Run Code Online (Sandbox Code Playgroud)
我认为那是不可能的.频道没有任何"当前数据",您订阅频道并开始接收频道上其他客户端正在推送的消息,因此它是一个阻止API.另外,如果你看一下pub/sub 的Redis Commands文档,它会更清楚.
这是阻塞阻塞侦听器的一个工作示例.
import sys
import cmd
import redis
import threading
def monitor():
r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)
channel = sys.argv[1]
p = r.pubsub()
p.subscribe(channel)
print 'monitoring channel', channel
for m in p.listen():
print m['data']
class my_cmd(cmd.Cmd):
"""Simple command processor example."""
def do_start(self, line):
my_thread.start()
def do_EOF(self, line):
return True
if __name__ == '__main__':
if len(sys.argv) == 1:
print "missing argument! please provide the channel name."
else:
my_thread = threading.Thread(target=monitor)
my_thread.setDaemon(True)
my_cmd().cmdloop()
Run Code Online (Sandbox Code Playgroud)