kri*_*ver 13 reactive-programming redis lettuce spring-data-redis project-reactor
我使用spring boot webflux+ project reactor+lettuce连接和非阻塞方式查询Redis的。我已经配置了ReactiveRedisTemplate带LettuceConnectionFactory。spring 文档指出,使用管道的唯一方法ReactiveRedisTemplate是使用该execute(<RedisCallback>)方法。在 non-reactive 中RedisTemplate,我看到有一种executePipelined(<RedisCallback>)方法可以在执行回调之前打开/关闭管道。但是在ReactiveRedisTemplate.execute方法的情况下,它使用 aLettuceReactiveRedisConnection并且既Spring ReactiveRedisConnection没有Lettuce也没有没有对管道的引用。
所以我的问题是,是否可以在使用Spring ReactiveRedisTemplate+时流水线化您的命令ReactiveLettuceConnection?
我也注意到,使用ReactiveRedisTemplate.execute了RedisCallback具有多个Redis命令的执行速度比打电话只是单独的命令慢。
带有 ReactiveRedisTemplate 的管道示例代码:
reactiveRedisTemplate.execute(connection -> keys.flatMap(key ->
connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes()))))
.map(Map.Entry::getValue)
.map(ByteUtils::getBytes)
.map(b -> {
try {
return mapper.readValue(b, Value.class);
} catch (IOException e1) {
return null;
}
})
.collectList();
Run Code Online (Sandbox Code Playgroud)
没有管道的代码:
keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
.map(Map.Entry::getValue)
.cast(Value.class)
.collectList();
Run Code Online (Sandbox Code Playgroud)
谢谢!
我认为使用 RedisReactiveTemplate 或 lettuce 的反应式 api 是不可能的。事实上,当您构建反应链时,其中的某些部分将被延迟评估。
getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()
Run Code Online (Sandbox Code Playgroud)
例如,在此示例中,仅当 getAsyncValue 返回值时才会触发 doSomething。
现在,如果我们采用 RedisCallback 示例并假设连接对象中有一个lushAll 方法。你在哪里/什么时候打电话?
tpl.execute(connection -> {
Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes())));
connection.fluxAll();
return results;
})
Run Code Online (Sandbox Code Playgroud)
像这样,任何命令都不会被刷新到服务器,因为不会触发 hashCommands。
现在让我们看看我们拥有的所有信号回调:
doOnError 或 doOnCancel 对我们没有帮助。但我们可以考虑使用 doFinally、doOnTerminate、doAfterTerminate:
tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes())))
.doFinally(s -> connection.flushAll()))
Run Code Online (Sandbox Code Playgroud)
但是 htGetAll 在命令刷新到服务器之前不会完成,因此 doFinnaly 不会被调用,所以我们将无法刷新......
我能想到的唯一解决方法是直接使用 Lettuce 的异步 api。文档中有一个关于如何执行此操作的示例。
您的代码可能如下所示(未经测试):
// client is a RedisClient from lettuce
StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
.collectList()
.doOnNext(f -> command.flushCommands())
.flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1897 次 |
| 最近记录: |