如何配置 Lettuce Redis 集群异步连接池

Mr.*_*r.j 5 spring redis lettuce

我正在配置我的生菜 rediscluster 池。我按照官方文档配置的时候,连接池无法正常初始化,无法获取连接。官方文档指出:

RedisClusterClient clusterClient = 
RedisClusterClient.create(RedisURI.create(host, port));

AsyncPool<StatefulRedisConnection<String, String>> pool =             AsyncConnectionPoolSupport.createBoundedObjectPool(        () -> clusterClient.connectAsync(StringCodec.UTF8),     BoundedPoolConfig.create());
// execute work
CompletableFuture<String> setResult = pool.acquire().thenCompose(connection -> {        
    RedisAsyncCommands<String, String> async = connection.async();    
    async.set("key", "value");
    return async.async.set("key2", "value2").whenComplete((s, throwable) -> pool.release(c));
});

// terminating
pool.closeAsync();

// after pool completion
client.shutdownAsync();
Run Code Online (Sandbox Code Playgroud)

此配置在我的环境中不起作用。然后我添加 minIdle 配置:

final BoundedPoolConfig.Builder builder = BoundedPoolConfig.builder();
builder.minIdle(9);
Run Code Online (Sandbox Code Playgroud)

它在开始时起作用,但是当我循环连接池并多次发送命令时,会引发以下异常:

java.util.concurrent.ExecutionException: java.lang.IllegalStateException:             AsyncPool is closed   at     java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
Run Code Online (Sandbox Code Playgroud)

这是我所有的代码:

private String passwd = "xxxxx";
private String ip = "10.0.0.204";
;

@Bean
@Scope("singleton")
public ClientResources clientResources() {
    final DefaultClientResources defaultClientResources = DefaultClientResources.builder()
                                                                                .ioThreadPoolSize(4)
                                                                                .computationThreadPoolSize(4)
                                                                                .build();

    return defaultClientResources;
}

@Bean(destroyMethod = "shutdown")
@Scope("singleton")
public RedisClusterClient clusterClient(ClientResources clientResources) {
    final String ip = "10.0.0.204";
    final String passwd = "dingXiang123";
    final RedisURI redisURI1 = RedisURI.Builder.redis(ip, 7001).withPassword(passwd).build();
    final RedisURI redisURI2 = RedisURI.Builder.redis(ip, 7002).withPassword(passwd).build();
    final RedisURI redisURI3 = RedisURI.Builder.redis(ip, 7003).withPassword(passwd).build();
    final RedisURI redisURI4 = RedisURI.Builder.redis(ip, 7004).withPassword(passwd).build();
    final RedisURI redisURI5 = RedisURI.Builder.redis(ip, 7005).withPassword(passwd).build();
    final RedisURI redisURI6 = RedisURI.Builder.redis(ip, 7006).withPassword(passwd).build();
    RedisClusterClient clusterClient = null;
    try {
        final List<RedisURI> redisURIS = Arrays.asList(redisURI1, redisURI2, redisURI3, redisURI4, redisURI5, redisURI6);
        clusterClient = RedisClusterClient.create(clientResources, redisURIS);
        ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                                                                                            .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
                                                                                            //???refresh????
                                                                                            .adaptiveRefreshTriggersTimeout(Duration.ofMinutes(3))
                                                                                            .build();


        clusterClient.setOptions(ClusterClientOptions.builder()
                                                     .topologyRefreshOptions(topologyRefreshOptions)
                                                     .autoReconnect(true)
                                                     .pingBeforeActivateConnection(true)
                                                     .build());

        final RedisAdvancedClusterAsyncCommands<String, String> async = clusterClient.connect().async();
        final RedisFuture<String> set = async.set("aa", "aaaaa");
        set.get();
        log.info("????????");
        return clusterClient;
    } catch (Exception e) {
        log.error("lettce?????????{}", e);
        if (clusterClient != null) {
            clusterClient.shutdown();
        }
    }

    return null;
}

/**
 * ?????? Cluter ?????
 *
 * @param clusterClient
 * @return
 */
@Bean()
@DependsOn("clusterClient")
@Scope("singleton")
public BoundedAsyncPool<StatefulRedisClusterConnection<String, String>> lettucePool(RedisClusterClient clusterClient) {
    final BoundedPoolConfig.Builder builder = BoundedPoolConfig.builder();
    builder.minIdle(9);
    final BoundedPoolConfig boundedPoolConfig = builder.build();
    final BoundedAsyncPool<StatefulRedisClusterConnection<String, String>> lettucePool = AsyncConnectionPoolSupport.createBoundedObjectPool(
            () -> clusterClient.connectAsync(StringCodec.UTF8)
            , boundedPoolConfig
    );


    log.info("????????");
    return lettucePool;
}

/**
 * ????????
 *
 * @param lettucePool
 */
@Bean
@DependsOn("lettucePool")
public CompletableFuture<StatefulRedisClusterConnection<String, String>> clusterAsync(BoundedAsyncPool<StatefulRedisClusterConnection<String, String>> lettucePool) {

    final CompletableFuture<StatefulRedisClusterConnection<String, String>> acquire = lettucePool.acquire();
    return acquire;
}
Run Code Online (Sandbox Code Playgroud)
  1. 你又遇到这个问题了,你是怎么解决的?

  2. 还有一点是我不太喜欢redisTemplate来操作letuce API,所以在找原生Lettuce集群池的配置方案。

  3. 之前有没有做过原集群池的配置或者Api的使用,或者看过详细的Demo文档,如果有请推荐给我(当然我也看了官方文档,我可能需要一个Demo申请学习)