让我们假设控制器的这两种情况产生一些延迟的随机数:
1)Reactive Spring 5反应性应用:
@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
return generateRandomNumbers(10, 500);
}
/**
* Non-blocking randon number generator
* @param amount - # of numbers to generate
* @param delay - delay between each number generation in milliseconds
* @return
*/
public Flux<Double> generateRandomNumbers(int amount, int delay){
return Flux.range(1, amount)
.delayMillis(delay)
.map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)
2)传统的Spring MVC DeferredResult:
@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
DeferredResult<Double[]> dr = new DeferredResult<Double[]>();
CompletableFuture.supplyAsync(() -> {
return generateRandomNumbers(10, 500);
}).whenCompleteAsync((p1, p2) -> { …Run Code Online (Sandbox Code Playgroud) 我正试图用来Flux.buffer()从数据库中批量加载。
用例是从数据库加载记录可能是“突发的”,我想引入一个小缓冲区,以便在可能的情况下将加载分组在一起。
我的概念方法是使用某种形式的处理器,发布到它的接收器,让该缓冲区,然后订阅并过滤所需的结果。
我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的Mono)。
下面是我到目前为止所到达的地方-主要是绊脚石。
当前,这将返回一个结果,但是后续的调用将被丢弃(尽管我不确定在哪里)。
class BatchLoadingRepository {
// I've tried all manner of different processors here. I'm unsure if
// TopicProcessor is the correct one to use.
private val bufferPublisher = TopicProcessor.create<String>()
private val resultsStream = bufferPublisher
.bufferTimeout(50, Duration.ofMillis(50))
// I'm unsure if concatMapIterable is the correct operator here,
// but it seems to work.
// I'm really trying to turn the List<MyEntity>
// into a stream of MyEntity, published on the Flux<>
.concatMapIterable { requestedIds …Run Code Online (Sandbox Code Playgroud) 当响应为 5xx 时,我想在等待 10 秒后重试请求 3 次。但我没有看到可以使用的方法。在对象上
WebClient.builder()
.baseUrl("...").build().post()
.retrieve().bodyToMono(...)
Run Code Online (Sandbox Code Playgroud)
我可以看到方法:
在重试次数但没有延迟的条件下重试
.retry(3, {it is WebClientResponseException && it.statusCode.is5xxServerError} )
Run Code Online (Sandbox Code Playgroud)
重试退避和次数但没有条件
.retryBackoff
Run Code Online (Sandbox Code Playgroud)
还有一个,retryWhen但我不知道如何使用它
我有以下代码:
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class GreetingHandler
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue("Hello Spring!"));
}
}
Run Code Online (Sandbox Code Playgroud)
我理解这段代码,除了 Mono 类做什么以及它的功能是什么。我进行了大量搜索,但并没有直截了当:Mono 类是什么以及何时使用它?
我有一个Project Reactor链,它包含一个阻塞任务(网络调用,我们需要等待响应).我想同时运行多个阻塞任务.
似乎可以使用ParallelFlux或flatMap(),裸骨示例:
Flux.just(1)
.repeat(10)
.parallel(3)
.runOn(Schedulers.elastic())
.doOnNext(i -> blockingTask())
.sequential()
.subscribe()
Run Code Online (Sandbox Code Playgroud)
要么
Flux.just(1)
.repeat(10)
.flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?
我有返回的函数,Mono<Boolean>我想将它映射到Mono<Void>(因为这是我在Controller方法中返回的东西).
有没有更好的方法来返回这样的Mono而不是.flatMap { Mono.empty<Void>() }?
我无法使用.map{ null }因为映射功能无法接受nulls.
我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)
虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)
是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?
publisher publish-subscribe reactive-programming project-reactor reactive-streams
我使用spring boot webflux+ project reactor+lettuce连接和非阻塞方式查询Redis的。我已经配置了ReactiveRedisTemplate带LettuceConnectionFactory。spring 文档指出,使用管道的唯一方法ReactiveRedisTemplate是使用该execute(<RedisCallback>)方法。在 non-reactive 中RedisTemplate,我看到有一种executePipelined(<RedisCallback>)方法可以在执行回调之前打开/关闭管道。但是在ReactiveRedisTemplate.execute方法的情况下,它使用 aLettuceReactiveRedisConnection并且既Spring ReactiveRedisConnection没有Lettuce也没有没有对管道的引用。
所以我的问题是,是否可以在使用Spring ReactiveRedisTemplate+时流水线化您的命令ReactiveLettuceConnection?
我也注意到,使用ReactiveRedisTemplate.execute了RedisCallback具有多个Redis命令的执行速度比打电话只是单独的命令慢。
带有 ReactiveRedisTemplate 的管道示例代码:
reactiveRedisTemplate.execute(connection -> keys.flatMap(key ->
connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes()))))
.map(Map.Entry::getValue)
.map(ByteUtils::getBytes)
.map(b -> {
try {
return mapper.readValue(b, Value.class);
} catch (IOException e1) {
return null;
}
})
.collectList();
Run Code Online (Sandbox Code Playgroud)
没有管道的代码:
keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
.map(Map.Entry::getValue) …Run Code Online (Sandbox Code Playgroud) reactive-programming redis lettuce spring-data-redis project-reactor
@GetMapping(path = "/cars", produces = "text/event-stream")
public Flux<Car> getCarStream() {
System.out.println("application/stream+json");
return this.repository.findCarsBy().log();
}
Run Code Online (Sandbox Code Playgroud)
上面的代码和下面的有什么区别:
@GetMapping(path = "/cars", produces = "application/stream+json")
public Flux<Car> getCarStream() {
System.out.println("application/stream+json");
return this.repository.findCarsBy().log();
}
Run Code Online (Sandbox Code Playgroud)
到目前为止,我发现了矛盾的信息:有人说它们都表示服务器发送的事件,而其他人则表示存在差异。
我正在为我的spring-boot应用程序使用WebClient和自定义BodyExtractor类
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
Run Code Online (Sandbox Code Playgroud)
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
Run Code Online (Sandbox Code Playgroud)
上面的代码使用小的有效负载而不是大的有效负载,我认为这是因为我只读取一个通量值,next我不知道如何组合和读取所有dataBuffer.
我是反应堆的新手,所以我不知道很多使用flux/mono的技巧.
project-reactor ×10
java ×5
spring ×5
kotlin ×1
lettuce ×1
publisher ×1
redis ×1
rx-java ×1
spring-boot ×1
spring-mono ×1