PubSub如何在BookSleeve/Redis中工作?

Mat*_*olf 3 c# redis servicestack booksleeve

我想知道使用BookSleeve发布和订阅频道的最佳方式是什么.我目前实现了几种静态方法(见下文),这些方法允许我将内容发布到特定通道,并将新创建的通道存储在其中private static Dictionary<string, RedisSubscriberConnection> subscribedChannels;.

这是正确的方法,因为我想发布到通道并订阅同一个应用程序中的通道(注意:我的包装器是一个静态类).即使我想发布和订阅,是否足以创建一个频道?显然,我不会发布到在同一个应用程序中订阅的相同频道.但我测试了它并且它有效:

 RedisClient.SubscribeToChannel("Test").Wait();
 RedisClient.Publish("Test", "Test Message");
Run Code Online (Sandbox Code Playgroud)

它起作用了.

在这里我的问题:

1)设置专用发布频道和专用订阅频道而不是为两者使用一个频道是否更有效?

2)语义上"channel"和"PatternSubscription"有什么区别?我的理解是,我可以PatternSubscription()在同一个频道上订阅几个"主题",对吗?但是如果我想为每个"主题"调用不同的回调,我必须为每个主题设置一个正确的通道吗?这样有效吗?或者你会建议反对吗?

这里是代码片段.

谢谢!!!

    public static Task<long> Publish(string channel, byte[] message)
    {
        return connection.Publish(channel, message);
    }

    public static Task SubscribeToChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();

        subscribedChannels[subscriptionString] = channel;

        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);
    }

    public static Task UnsubscribeFromChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        if (subscribedChannels.Keys.Contains(subscriptionString))
        {
            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];

            Task  task = channel.PatternUnsubscribe(subscriptionString);

            //remove channel subscription
            channel.Close(true);
            subscribedChannels.Remove(subscriptionString);

            return task;
        }
        else
        {
            return null;
        }
    }

    private static string ChannelSubscriptionString(string channelName)
    {
        return channelName + "*";
    }
Run Code Online (Sandbox Code Playgroud)

Mar*_*ell 6

1:你的例子中只有一个频道(Test); 频道只是用于特定发布/订阅交换的名称.但是,由于redis API的工作原理,有必要使用2个连接.具有任何订阅的连接不能执行任何其他操作,除了:

  • 听取消息
  • 管理自己的订阅(subscribe,psubscribe,unsubscribe,punsubscribe)

但是,我不明白这一点:

private static Dictionary<string, RedisSubscriberConnection>
Run Code Online (Sandbox Code Playgroud)

您不应该需要多个订户连接,除非您正在为您提供特定的服务.单个订户连接可以处理任意数量的订阅.快速检查client list我的一台服务器,并且我有一个连接(在撰写本文时)有23,002个订阅.哪个可能会减少,但是:它有效.

2:模式订阅支持通配符; 因此/topic/1,/topic/2/您可以订阅,而不是订阅等/topic/*.所使用的实际信道的名称publish作为回调签名的一部分提供给接收方.

要么可以工作.应该注意的是,性能publish受到唯一订阅总数的影响 - 但坦率地说它仍然是愚蠢的快速(如:0ms),即使你有数十万个订阅频道使用subscribe而不是psubscribe.

但来自 publish

时间复杂度:O(N + M)其中N是订阅接收信道的客户端数量,M是订阅模式的总数(由任何客户端).

我建议阅读pub/sub的redis文档.


编辑后续问题:

a)我假设我必须同步"发布"(使用Result或Wait())如果我想保证在接收项目时保留从同一发布者发送项目的顺序,对吗?

这根本不会有任何区别; 既然你提到Result/ Wait(),我假设你在谈论BookSleeve - 在这种情况下,多路复用器已经保留了命令顺序.Redis本身是单线程的,并且将始终按顺序处理单个连接上的命令.但是:订阅者的回调可以异步执行,并且可以(单独)传递给工作者线程.我目前正在调查我是否可以强制执行此操作RedisSubscriberConnection.

更新:从1.3.22开始,您可以设置CompletionModePreserveOrder- 然后所有回调将按顺序完成,而不是同时完成.

b)根据您的建议进行调整后,无论有效负载的大小如何,我都会在发布少量项目时获得出色的表现.但是,当同一发布者发送100,000个或更多项目时,性能会迅速下降(从我的机器发送到7-8秒).

首先,那个时间听起来很高 - 在本地测试我得到(100,000个出版物,包括等待所有这些的响应)1766ms(本地)或1219ms(远程)(这可能听起来反直觉,但我的"本地"不是'运行相同版本的redis;我的"远程"是Centos上的2.6.12;我的"本地"是Windows上的2.6.8-pre2.

我无法使您的实际服务器更快或加速网络,但是:如果这是数据包碎片,我已添加(仅适合您)一对SuspendFlush()/ ResumeFlush().这会禁用eager-flush(即当发送队列为空时;其他类型的刷新仍然发生); 你可能会发现这有帮助:

conn.SuspendFlush();
try {
    // start lots of operations...
} finally {
    conn.ResumeFlush();
}
Run Code Online (Sandbox Code Playgroud)

请注意,Wait在恢复之前不应该这样做,因为在调用之前,ResumeFlush()仍然可以在发送缓冲区中执行某些操作.有了这一切,我得到(100,000次操作):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)
remote: 1219ms (eager-flush) vs 796ms (suspend-flush)
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,它对远程服务器的帮助更大,因为它将通过网络减少数据包.

我不能使用交易,因为后来待发布的项目并非一次全部可用.有没有办法优化这些知识?

认为这可以通过以上方式解决 - 但请注意最近CreateBatch也加入了.批处理操作很像交易 - 只是:没有交易.同样,它是另一种减少数据包碎片的机制.在您的特定情况下,我怀疑暂停/恢复(在冲洗时)是您最好的选择.

您是否建议使用一个通用的RedisConnection和一个RedisSubscriberConnection或任何其他配置来使这样的包装器执行所需的功能?

只要你不执行阻塞操作(blpop,brpop,brpoplpush等),或将过大的BLOB沿着电线(潜在延迟等操作,同时它清除),那么每个类型的单个连接通常工作得很好.但YMMV取决于您的确切使用要求.