如何从Java中的生菜RedisPubSubListener获取消息?

And*_*ard 4 java publish-subscribe redis

我刚刚开始使用redis,生菜和异步编码.现在遗憾的是,我找不到任何关于如何从侦听器获取消息到我的程序中的示例.我在这些函数上找到的javadoc或任何其他信息也没有多大帮助.那么有人可以解释如何将已发布的消息转换为字符串吗?

我的代码目前看起来像这样:

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
Run Code Online (Sandbox Code Playgroud)

我很确定我必须实现监听器的消息方法,但我甚至没有任何线索来开始.我知道params代表什么......但是这些方法的返回值为void,因此它们也不会输出任何消息.

那么,哪里开始呢?(完全不解)

mp9*_*1de 10

你开了个好头.Redis Pub/Sub涉及至少两方:

  1. 订阅者
  2. 和出版商

订阅者(猜测并不奇怪)订阅频道,模式或两者.

发布者将消息发布到频道.此设置也需要反映在您的代码中.

我通过使用扩展了你的代码RedisPubSubAdapter,因此代码不需要实现所有方法,只需要我们感兴趣的方法,例如message(channel, message):

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
Run Code Online (Sandbox Code Playgroud)

一旦添加了监听器并且客户端订阅了一个频道,它就可以接收发布/订阅消息了.当通知到达时,生菜会调用听众的方法.此时,重要的是要了解通知是在I/O线程上处理的,该线程与设置客户端和订阅的线程不同.

让我们来发送方.要向您的频道发送消息,您需要额外的连接(或者使用redis-cli和发行PUBLISH channel message).

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");
Run Code Online (Sandbox Code Playgroud)

Redis将发布消息Message 1Message 1在名为的频道上发布channel(不是创意名称,但它现在可以完成这项工作).

如果你连续执行代码并在发送一条消息后稍等一下,那么你很有可能通知监听器并且你看到一些系统输出如:

Channel: channel, Message: Message 1
Channel: channel, Message: Message 2
Run Code Online (Sandbox Code Playgroud)

异步:有什么影响?

现在是异步性的棘手部分.在某些情况下使用异步通信是有益的,但增加了复杂性 如果您可以在结果到达之前完成工作(某些计算直到您需要结果),或者您只想启动I/O并释放您正在处理的线程.服务器应用程序是异步模式的良好环境.典型的服务器具有有限的线程资源,并且它一直运行直到它关闭.在服务器启动时,您将注册订阅.一旦消息进入,它就会在I/O线程上处理,并且会调用您的侦听器

在独立应用程序中使用异步命令执行时(比如简单main),您就有了顺序流.异步消息传递将导致程序在代码流完成后退出.这并不一定意味着接收或处理了发布/订阅消息.如果您将逐个运行两个代码块,则main很可能根本看不到任何输出,因为程序终止的速度比I/O可能发生的要快.现在同步发挥作用.如何处理同步有很多可能性,但现在让我们看看两种替代方案:

  1. CountDownLatch:在发布程序流程之前需要执行许多操作
  2. Thread.sleep(…):等待几毫秒

CountDownLatch使用

final CountDownLatch latch = new CountDownLatch(2);
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
        latch.countDown();
    }
};

// ...
sender.sync().publish("channel", "Message 2");

latch.await();
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,a CountDownLatch有两次倒计时(latch.countDown()).调用latch.await()阻塞主线程(程序流)并使其等待直到CountDownLatch倒计时,从而释放程序以继续.

Thread.sleep(...)使用

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");

latch.await();

Thread.sleep(1000);
Run Code Online (Sandbox Code Playgroud)

此代码用于Thread.sleep(1000);等待一秒钟(在主线程上).这应该足以接收消息.不要这样做.这种方法快速而且脏,可能适合播放和调试,但避免Thread.sleep使用合理的代码.

有两个参与者的事情

Redis的规定对订阅的频道/模式连接的约束:一旦订阅,你只被允许执行SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE,PUNSUBSCRIBE,PINGQUIT命令.PUBLISH不允许在该连接上执行.因此,您需要使用其他连接.