redis pub sub with jedis,sub crashes with error

bab*_*ira 3 java publish-subscribe redis jedis

所有

我安装了最新的Redis 2.4.16并尝试将其Pub/Sub系统与java一起使用.我每隔一秒就给一个频道发一条消息.Publisher没有问题,但Subscriber与邮件崩溃

例外:

redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
at redis.clients.jedis.Protocol.processError(Protocol.java:59)
at redis.clients.jedis.Protocol.process(Protocol.java:66)
at redis.clients.jedis.Protocol.read(Protocol.java:131)
at redis.clients.jedis.Connection.getObjectMultiBulkReply(Connection.java:206)
at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:88)
at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:83)
at redis.clients.jedis.Jedis.subscribe(Jedis.java:1971)
at com.jedis.test.JedisSub$1.run(JedisSub.java:22)
at java.lang.Thread.run(Thread.java:680)
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

出版商:

        final Jedis jedis = new Jedis("localhost");

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        newFixedThreadPool.submit(new Runnable() {

            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    jedis.publish("CC", new Date().toString());
                }

            }
        });
Run Code Online (Sandbox Code Playgroud)

订户:

        JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);
        final Jedis subscriberJedis = jedisPool.getResource();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
                } catch (Exception e) {
                   e.printStackTrace();
                }
            }
        }).start();

        jedisPool.returnResource(subscriberJedis);
Run Code Online (Sandbox Code Playgroud)

池配置:

    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.maxActive = 10;
    poolConfig.maxIdle = 5;
    poolConfig.minIdle = 1;
    poolConfig.testOnBorrow = true;
    poolConfig.numTestsPerEvictionRun = 10;
    poolConfig.timeBetweenEvictionRunsMillis = 60000;
    poolConfig.maxWait = 3000;
    poolConfig.whenExhaustedAction = org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_FAIL;
Run Code Online (Sandbox Code Playgroud)

为了安装Redis,我只是使用了命令

make PREFIX=/Users/ggg/dev/dist/redis/ install
Run Code Online (Sandbox Code Playgroud)

在此之后我没有使用 ./install_server.sh

Jedis版本是2.1.0,平台是Mac OS X.

注意:我注意到用户在启动后大约30秒左右崩溃.

Did*_*zia 8

发布者和订阅者的代码都以自己的方式出错.

该错误来自于发布者和订阅者之间无法共享Redis连接的事实.实际上,您需要为发布者提供连接(或连接池),并且只需要为订户线程建立一个专用连接.每个进程运行一个订户线程通常就足够了.

在这里,在订户线程完成之前,将subscriberJedis连接过早地返回到池,以便共享连接.

在出版商:

由于您有10个线程池,因此不应跨这些线程共享唯一连接.这是使用连接池的理想场所,必须在每个线程中抓取和释放连接.

    // This should be a global singleton
    JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
    newFixedThreadPool.submit(new Runnable() {

        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Jedis jedis = jedisPool.getResource();
                try {
                   jedis.publish("CC", new Date().toString());
                } catch (Exception e) {
                   e.printStackTrace();
                } finally {
                   jedisPool.returnResource(jedis);
                }
            }

        }
    });
Run Code Online (Sandbox Code Playgroud)

在订户中:

在订户中,您需要专用连接.

    new Thread(new Runnable() {
        @Override
        public void run() {
            Jedis subscriberJedis = new Jedis("localhost");
            try {
                subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
            } catch (Exception e) {
               e.printStackTrace();
            }
        }
    }).start();
Run Code Online (Sandbox Code Playgroud)

如果您需要订阅不同的渠道或模式,最好在同一个线程和同一个连接中设置其他订阅.