我正在尝试监听 Redis 流并在消息到达时对其进行处理。我正在使用异步命令,我希望消息被推送而不是被拉取。所以我认为不需要 while 循环。但下面的代码似乎不起作用。
public static void main(String[] args) throws InterruptedException {
RedisClient redisClient = RedisClient
.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
RedisAsyncCommands commands = connection.async();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.thenAccept(System.out::println);
Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)
它只打印程序启动时流中的所有内容,而不打印程序运行时添加的消息。是否应该为新添加到流中的每条消息调用回调?
java lettuce spring-data-redis spring-data-redis-reactive redis-streams
我是反应式编程的新手。我需要连接到 Redis 来保存和获取一些数据。Redis 实例存在于云中。我使用 Lettuce Connection 工厂来建立连接。
与redis建立连接时,请求失败。这是我的 Redis 配置类:
package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
@Configuration
@EnableAsync
public class RedisConfig {
private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);
@Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
private String host;
@Value("${vcap.services.redis.credentials.port:36516}")
private int port;
@Value("$vcap.services.redis.credentials.password:123456788")
private String password;
@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { …Run Code Online (Sandbox Code Playgroud) redis lettuce spring-data-redis reactive spring-data-redis-reactive