使用 Reactive Lettuce 流水线 Redis 命令

kri*_*ver 13 reactive-programming redis lettuce spring-data-redis project-reactor

我使用spring boot webflux+ project reactor+lettuce连接和非阻塞方式查询Redis的。我已经配置了ReactiveRedisTemplateLettuceConnectionFactory。spring 文档指出,使用管道的唯一方法ReactiveRedisTemplate是使用该execute(<RedisCallback>)方法。在 non-reactive 中RedisTemplate,我看到有一种executePipelined(<RedisCallback>)方法可以在执行回调之前打开/关闭管道。但是在ReactiveRedisTemplate.execute方法的情况下,它使用 aLettuceReactiveRedisConnection并且既Spring ReactiveRedisConnection没有Lettuce也没有没有对管道的引用。

所以我的问题是,是否可以在使用Spring ReactiveRedisTemplate+时流水线化您的命令ReactiveLettuceConnection

我也注意到,使用ReactiveRedisTemplate.executeRedisCallback具有多个Redis命令的执行速度比打电话只是单独的命令慢。

带有 ReactiveRedisTemplate 的管道示例代码:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> {
                        try {
                        return mapper.readValue(b, Value.class);
                        } catch (IOException e1) {
                        return null;
                        }
                    })
                    .collectList();
Run Code Online (Sandbox Code Playgroud)

没有管道的代码:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue)
            .cast(Value.class)
            .collectList();
Run Code Online (Sandbox Code Playgroud)

谢谢!

JEY*_*JEY 0

我认为使用 RedisReactiveTemplate 或 lettuce 的反应式 api 是不可能的。事实上,当您构建反应链时,其中的某些部分将被延迟评估。

getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()

Run Code Online (Sandbox Code Playgroud)

例如,在此示例中,仅当 getAsyncValue 返回值时才会触发 doSomething。

现在,如果我们采用 RedisCallback 示例并假设连接对象中有一个lushAll 方法。你在哪里/什么时候打电话?

tpl.execute(connection -> {
                    Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
                            connection.hashCommands()
                                    .hGetAll(ByteBuffer.wrap(key.getBytes())));
                    connection.fluxAll();
                    return results;
                })
Run Code Online (Sandbox Code Playgroud)

像这样,任何命令都不会被刷新到服务器,因为不会触发 hashCommands。

现在让我们看看我们拥有的所有信号回调:

  • 执行下一步
  • 发生错误时
  • 取消时执行
  • 先做
  • 订阅时执行
  • 请求时执行
  • 终止时执行
  • 终止后执行
  • 最后做
  • 完成时执行

doOnError 或 doOnCancel 对我们没有帮助。但我们可以考虑使用 doFinally、doOnTerminate、doAfterTerminate:

tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
                                        .hGetAll(ByteBuffer.wrap(key.getBytes())))
                                        .doFinally(s -> connection.flushAll()))
Run Code Online (Sandbox Code Playgroud)

但是 htGetAll 在命令刷新到服务器之前不会完成,因此 doFinnaly 不会被调用,所以我们将无法刷新......

我能想到的唯一解决方法是直接使用 Lettuce 的异步 api。文档中有一个关于如何执行此操作的示例。

您的代码可能如下所示(未经测试):

// client is a RedisClient from lettuce

StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
                .collectList()
                .doOnNext(f -> command.flushCommands())
                .flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))
Run Code Online (Sandbox Code Playgroud)