标签: spring-data-redis-reactive

如何使用 Lettuce Java 库持续监听 Redis 流

我正在尝试监听 Redis 流并在消息到达时对其进行处理。我正在使用异步命令,我希望消息被推送而不是被拉取。所以我认为不需要 while 循环。但下面的代码似乎不起作用。

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)

它只打印程序启动时流中的所有内容,而不打印程序运行时添加的消息。是否应该为新添加到流中的每条消息调用回调?

java lettuce spring-data-redis spring-data-redis-reactive redis-streams

7
推荐指数
1
解决办法
3086
查看次数

无法连接Redis;嵌套异常是 io.lettuce.core.RedisConnectionException 使用 ReactiveRedisTemplate

我是反应式编程的新手。我需要连接到 Redis 来保存和获取一些数据。Redis 实例存在于云中。我使用 Lettuce Connection 工厂来建立连接。

与redis建立连接时,请求失败。这是我的 Redis 配置类:

package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;

@Configuration
@EnableAsync
public class RedisConfig {
    private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);

    @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
    private String host;

    @Value("${vcap.services.redis.credentials.port:36516}")
    private int port;

    @Value("$vcap.services.redis.credentials.password:123456788")
    private String password;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { …
Run Code Online (Sandbox Code Playgroud)

redis lettuce spring-data-redis reactive spring-data-redis-reactive

5
推荐指数
1
解决办法
9万
查看次数