非阻塞Redis pubsub可能吗?

lim*_*boy 26 python redis redis-py

我想使用redis'pubsub来传输一些消息,但不希望被阻止使用listen,如下面的代码:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']
Run Code Online (Sandbox Code Playgroud)

最后一for节将阻止.我只是想检查一个给定的通道是否有数据,我该如何做到这一点?有check类似的方法吗?

var*_*tec 43

如果您正在考虑非阻塞,异步处理,那么您可能正在使用(或应该使用)异步框架/服务器.

更新:自原始答案以来已经过去了5年,同时Python获得了本机异步IO支持.现在有AIORedis,一个异步IO Redis客户端.

  • 这是应该检查标记的正确答案.我不确定为什么人们会重新发明轮子,有一个已经存在的异步客户端用于redis,在这样一个客户端的存在下产生新线程并不是真的需要. (2认同)

dal*_*ore 15

接受的答案已经过时,因为redis-py建议您使用非阻塞get_message().但它也提供了一种轻松使用线程的方法.

https://pypi.python.org/pypi/redis

阅读消息有三种不同的策略.

在幕后,get_message()使用系统的"select"模块快速轮询连接的套接字.如果有可供读取的数据,get_message()将读取它,格式化消息并将其返回或传递给消息处理程序.如果没有要读取的数据,get_message()将立即返回None.这使得集成到应用程序内的现有事件循环中变得微不足道.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)
Run Code Online (Sandbox Code Playgroud)

较早版本的redis-py只能使用pubsub.listen()读取消息.listen()是一个阻塞直到消息可用的生成器.如果您的应用程序不需要执行任何其他操作,只需接收和处理从redis收到的消息,listen()是一种简单的方法来启动运行.

 for message in p.listen():
     # do something with the message
Run Code Online (Sandbox Code Playgroud)

第三个选项在单独的线程中运行事件循环.pubsub.run_in_thread()创建一个新线程并启动事件循环.线程对象返回给run_in_thread()的调用者.调用者可以使用thread.stop()方法来关闭事件循环和线程.在幕后,这只是一个围绕get_message()的包装器,它在一个单独的线程中运行,实际上为你创建了一个微小的非阻塞事件循环.run_in_thread()接受一个可选的sleep_time参数.如果指定,则事件循环将使用循环的每次迭代中的值调用time.sleep().

注意:由于我们在单独的线程中运行,因此无法处理未使用已注册的消息处理程序自动处理的消息.因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py会阻止您调用run_in_thread().

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()
Run Code Online (Sandbox Code Playgroud)

所以要回答你的问题,只需要在知道邮件是否到达时检查get_message.


小智 13

新版本的redis-py支持异步pubsub,有关详细信息,请查看https://github.com/andymccurdy/redis-py.这是文档本身的一个例子:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)
Run Code Online (Sandbox Code Playgroud)


Ank*_*kur 7

我认为那是不可能的.频道没有任何"当前数据",您订阅频道并开始接收频道上其他客户端正在推送的消息,因此它是一个阻止API.另外,如果你看一下pub/sub 的Redis Commands文档,它会更清楚.


dir*_*kk0 7

这是阻塞阻塞侦听器的一个工作示例.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
Run Code Online (Sandbox Code Playgroud)