Ura*_*lpa 5 redis lettuce spring-data-redis reactive spring-data-redis-reactive
我是反应式编程的新手。我需要连接到 Redis 来保存和获取一些数据。Redis 实例存在于云中。我使用 Lettuce Connection 工厂来建立连接。
与redis建立连接时,请求失败。这是我的 Redis 配置类:
package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
@Configuration
@EnableAsync
public class RedisConfig {
    private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);
    @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
    private String host;
    @Value("${vcap.services.redis.credentials.port:36516}")
    private int port;
    @Value("$vcap.services.redis.credentials.password:123456788")
    private String password;
    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        redisStandaloneConfiguration.setDatabase(0);
        log.error("Redis standalone configuration{}",JsonUtil.toJsonString(redisStandaloneConfiguration));
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
        lettuceConnectionFactory.afterPropertiesSet();
        return lettuceConnectionFactory;
    }
    @Bean
    ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        Jackson2JsonRedisSerializer<TaxDetails> serializer = new Jackson2JsonRedisSerializer<>(TaxDetails.class);
        Jackson2JsonRedisSerializer<TaxLine> serializer1 = new Jackson2JsonRedisSerializer<>(TaxLine.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
                .newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(serializer).value(serializer1).build();
        ;
        return new ReactiveRedisTemplate<>(
                reactiveRedisConnectionFactory, context);
    }
}
这是我的查找服务类,它在请求期间实际上与 Redis 进行通信
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Service;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.RedisRepo;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class RedisTaxLineLookUpService {
    private static final Logger log = LoggerFactory.getLogger(RedisTaxLineLookUpService.class);
    @Autowired
    private ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations;
    public Flux<TaxLine> get(TaxDetails taxDetails) {
        log.info("going to call redis to fetch tax lines{}", JsonUtil.toJsonString(taxDetails));
        return redisOperations.keys(taxDetails).flatMap(redisOperations.opsForValue()::get);
    }
    public Mono<RedisRepo> set(RedisRepo redisRepo) {
        log.info("going to call redis to save tax lines{}", JsonUtil.toJsonString(redisRepo.getTaxDetails()));
        return redisOperations.opsForValue().set(redisRepo.getTaxDetails(), redisRepo.getTaxLine())
                .map(__ -> redisRepo);
    }
}
堆栈跟踪 :
2020-03-26T16:27:54.513+0000 [APP/PROC/WEB/0] OUT                org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 10.11.241.101:36516 |    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) |    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:  | Error has been observed at the following site(s): |   |_ checkpoint ? Handler com.sap.slh.tax.attributes.determination.springwebfluxdemo.controller.TaxLinesDeterminationController#saveTaxLines(RedisRepo) [DispatcherHandler] |     |_ checkpoint ? HTTP POST "/tax/lines/save/" [ExceptionHandlingWebHandler] | Stack trace: |         at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) |        at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1178) |      at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedReactiveConnection(LettuceConnectionFactory.java:952) |      at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:429) |        at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:94) |         at org.springframework.data.redis.core.ReactiveRedisTemplate.lambda$doInConnection$0(ReactiveRedisTemplate.java:198) |      at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:85) |         at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80) |      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) |        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) |        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) |       at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) |        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) |       at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) |       at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) |      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) |        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) |         at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) |       at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) |       at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) |      at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) |         at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) |        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) |        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) |       at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) |       at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) |      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) |      at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) |      at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) |      at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) |       at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) |      at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) |      at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) |      at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) |      at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) |      at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:489) |        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) |       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
任何建议或答案都会非常有帮助!提前致谢 !
我更新了我的 RedisConfig 类,如下所示:
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import io.lettuce.core.RedisURI;
import io.pivotal.cfenv.core.CfEnv;
@Configuration
public class RedisConfig {
    CfEnv cfEnv = new CfEnv();
    String tag = "redis";
    String redisHost = cfEnv.findCredentialsByTag(tag).getHost();
    @Bean
    @Primary
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
        LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                .commandTimeout(Duration.ofMillis(60000)).build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }
    @Bean
    public RedisConfiguration defaultRedisConfig() {
        if (redisHost != null) {
//          RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("127.0.0.1", 6379);
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            String redisPort = cfEnv.findCredentialsByTag(tag).getPort();
            String redisPassword = cfEnv.findCredentialsByTag(tag).getPassword();
            config.setHostName(redisHost);
            config.setPassword(RedisPassword.of(redisPassword));
            config.setPort(Integer.parseInt(redisPort));
            config.setDatabase(2);
            return config;
        } else {
            RedisSentinelConfiguration config = new RedisSentinelConfiguration();
            String uri = cfEnv.findCredentialsByTag(tag).getUri();
            RedisURI redisURI = RedisURI.create(uri);
            config.master(redisURI.getSentinelMasterId());
            List<RedisNode> nodes = redisURI.getSentinels().stream()
                    .map(redisUri -> populateNode(redisUri.getHost(), redisUri.getPort())).collect(Collectors.toList());
            nodes.forEach(node -> config.addSentinel(node));
            config.setPassword(RedisPassword.of(redisURI.getPassword()));
            config.setDatabase(2);
            return config;
        }
    }
    @Bean
    public ReactiveRedisOperations<TaxDetails, TaxLine> reactiveRedisTemplate(
        ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<TaxLine> valueSerializer = new Jackson2JsonRedisSerializer<>(
            TaxLine.class);
        Jackson2JsonRedisSerializer<TaxDetails> valueSerializer1 = new Jackson2JsonRedisSerializer<>(
                TaxDetails.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
            .newSerializationContext(keySerializer);
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(valueSerializer1).value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }
    private RedisNode populateNode(String host, Integer port) {
        return new RedisNode(host, port);
    }
}
cfEnv 的依赖项:
            <groupId>io.pivotal.cfenv</groupId>
            <artifactId>java-cfenv-boot</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>