Redis Pubsub和消息队列

Mar*_*lio 50 python redis redis-cli

我的整体问题是:使用Redis for PubSub,当发布者将消息推送到某个频道的速度超过订阅者能够阅读它们时,消息会发生什么变化?

例如,假设我有:

  • 一个简单的发布者以2 msg/sec的速率发布消息.
  • 一个简单的用户以1 msg/sec的速率读取消息.

我天真的假设是订阅者只会看到50%的消息发布到Redis上.为了测试这个理论,我写了两个脚本:

pub.py

queue = redis.StrictRedis(host='localhost', port=6379, db=0)
channel = queue.pubsub()

for i in range(10): 
    queue.publish("test", i)
    time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)

sub.py

r = redis.StrictRedis(host='localhost', port=6379, db=0)
p = r.pubsub()
p.subscribe('test')

while True:
    message = p.get_message()
    if message:
        print "Subscriber: %s" % message['data']
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

结果

  • 当我先跑sub.py,然后紧接着pub.py,我发现sub.py实际上显示了所有的消息(1-10),一个接一个地延迟了1秒.我最初的假设是错误的,Redis正在排队消息.需要更多测试.
  • 当我先跑pub.py,然后在跑步前等了5秒sub.py,我发现sub.py只显示了消息的后半部分(5-10).我本来会假设这个,但鉴于我之前的结果,我会认为消息排队,这导致我得出以下结论......

结论

  • Redis服务器似乎为每个客户端为每个通道排队消息.
  • 只要客户端正在监听,它读取消息的速度并不重要.只要它已连接,消息将为该通道的该客户端保持排队.

剩下的问题

  • 这些结论有效吗?
  • 如果是这样,客户端/频道消息将保持排队多长时间?
  • 如果是,是否有redis-cli info命令查看排队的消息数(对于每个客户端/通道)?

Did*_*zia 93

测试是有效的,但结论部分错误.

Redis不会在pub/sub通道上排队.相反,它倾向于从发布者套接字读取项目,并在所有订阅者套接字中写入项目,理想情况是在事件循环的同一次迭代中.Redis数据结构中没有任何内容.

现在,正如您所展示的那样,仍然存在某种缓冲.这是由于使用了TCP/IP套接字和Redis通信缓冲区.

套接字有缓冲区,当然,TCP带有一些流控制机制.它可以避免缓冲区满时丢失数据.如果订户不够快,数据将在其套接字缓冲区中累积.当它已满时,TCP将阻止通信并阻止Redis在套接字中提取更多信息.

Redis还管理输出通信缓冲区(在其中一个套接字之上),以生成使用Redis协议格式化的数据.因此,当套接字的输出缓冲区已满时,事件循环会将套接字标记为不可写,并且数据将保留在Redis输出缓冲区中.

如果TCP连接仍然有效,则数据可以在缓冲区中保留很长时间.现在,socket和Redis输出缓冲区都被绑定了.如果用户真的太慢,并且大量数据累积,Redis最终将关闭与订户的连接(作为安全机制).

默认情况下,对于pub/sub,Redis的软限制为8 MB,每个连接缓冲区的硬限制为32 MB.如果输出缓冲区达到硬限制,或者它在软限制和硬限制之间保持超过60秒,则与慢速用户的连接将被关闭.

知道待处理消息的数量并不容易.可以通过查看套接字缓冲区中的挂起信息的大小和Redis输出缓冲区来评估它.

对于Redis输出缓冲区,您可以使用CLIENT LIST命令(来自redis-cli).输出缓冲区的大小在obl和oll字段中返回(以字节为单位).

对于套接字缓冲区,没有Redis命令.但是,在Linux上,可以构建一个脚本来解释/ proc/net/tcp文件的内容.在这里查看示例.此脚本可能需要适应您的系统.

  • “该问题有新答案-单击_here_以加载它们”-您输入的速度比我快,给出了一个了不起的答案:) (4认同)
  • 抱歉:-)我被这个问题所激励! (3认同)
  • 不,这是不可能的.缓冲区中的内容已经在Redis协议中编码,不能在随机位置任意剪切.它可能需要重新分析缓冲区的内容. (2认同)