标签: project-reactor

Web反应式编程 - 从HTTP客户端的角度来看,有哪些优势?

让我们假设控制器的这两种情况产生一些延迟的随机数:

1)Reactive Spring 5反应性应用:

@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
    return generateRandomNumbers(10, 500);
}

/**
 * Non-blocking randon number generator
 * @param amount - # of numbers to generate
 * @param delay - delay between each number generation in milliseconds
 * @return
 */
public Flux<Double> generateRandomNumbers(int amount, int delay){
    return Flux.range(1, amount)
               .delayMillis(delay)
               .map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)

2)传统的Spring MVC DeferredResult:

@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
    DeferredResult<Double[]> dr = new DeferredResult<Double[]>();

    CompletableFuture.supplyAsync(() -> {
        return generateRandomNumbers(10, 500);
    }).whenCompleteAsync((p1, p2) -> { …
Run Code Online (Sandbox Code Playgroud)

spring reactive-programming rx-java project-reactor

14
推荐指数
1
解决办法
2248
查看次数

使用反应堆的Flux.buffer批处理工作仅适用于单个项目

我正试图用来Flux.buffer()从数据库中批量加载。

用例是从数据库加载记录可能是“突发的”,我想引入一个小缓冲区,以便在可能的情况下将加载分组在一起。

我的概念方法是使用某种形式的处理器,发布到它的接收器,让该缓冲区,然后订阅并过滤所需的结果。

我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的Mono)。

下面是我到目前为止所到达的地方-主要是绊脚石。

当前,这将返回一个结果,但是后续的调用将被丢弃(尽管我不确定在哪里)。

class BatchLoadingRepository {
    // I've tried all manner of different processors here.  I'm unsure if
    // TopicProcessor is the correct one to use.
    private val bufferPublisher = TopicProcessor.create<String>()
    private val resultsStream = bufferPublisher
            .bufferTimeout(50, Duration.ofMillis(50))
            // I'm unsure if concatMapIterable is the correct operator here, 
            // but it seems to work.
            // I'm really trying to turn the List<MyEntity> 
            // into a stream of MyEntity, published on the Flux<>
            .concatMapIterable { requestedIds …
Run Code Online (Sandbox Code Playgroud)

java kotlin project-reactor reactive-streams

14
推荐指数
1
解决办法
270
查看次数

spring webclient:重试特定错误时退避

当响应为 5xx 时,我想在等待 10 秒后重试请求 3 次。但我没有看到可以使用的方法。在对象上

WebClient.builder()
                .baseUrl("...").build().post()
                .retrieve().bodyToMono(...)
Run Code Online (Sandbox Code Playgroud)

我可以看到方法:

在重试次数但没有延迟的条件下重试

.retry(3, {it is WebClientResponseException && it.statusCode.is5xxServerError} )
Run Code Online (Sandbox Code Playgroud)

重试退避和次数但没有条件

.retryBackoff 
Run Code Online (Sandbox Code Playgroud)

还有一个,retryWhen但我不知道如何使用它

java spring project-reactor spring-webflux spring-webclient

14
推荐指数
3
解决办法
2万
查看次数

Java 中的 Mono 类:是什么,何时使用?

我有以下代码:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class GreetingHandler 
    public Mono<ServerResponse> hello(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
        .body(BodyInserters.fromValue("Hello Spring!"));
    }
}
Run Code Online (Sandbox Code Playgroud)

我理解这段代码,除了 Mono 类做什么以及它的功能是什么。我进行了大量搜索,但并没有直截了当:Mono 类是什么以及何时使用它

java spring spring-boot project-reactor spring-mono

14
推荐指数
1
解决办法
2万
查看次数

用于阻塞I/O任务的ParallelFlux与flatMap()

我有一个Project Reactor链,它包含一个阻塞任务(网络调用,我们需要等待响应).我想同时运行多个阻塞任务.

似乎可以使用ParallelFlux或flatMap(),裸骨示例:

Flux.just(1)
    .repeat(10)
    .parallel(3)
    .runOn(Schedulers.elastic())
    .doOnNext(i -> blockingTask())
    .sequential()
    .subscribe()
Run Code Online (Sandbox Code Playgroud)

要么

Flux.just(1)
    .repeat(10)
    .flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?

project-reactor reactive-streams

13
推荐指数
1
解决办法
2833
查看次数

Reactor将Mono <Boolean>映射到Mono <Void>

我有返回的函数,Mono<Boolean>我想将它映射到Mono<Void>(因为这是我在Controller方法中返回的东西).

有没有更好的方法来返回这样的Mono而不是.flatMap { Mono.empty<Void>() }

我无法使用.map{ null }因为映射功能无法接受nulls.

spring project-reactor

13
推荐指数
1
解决办法
6890
查看次数

Project Reactor 3 中的 publishOn 与 subscribeOn

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?

publisher publish-subscribe reactive-programming project-reactor reactive-streams

13
推荐指数
2
解决办法
1万
查看次数

使用 Reactive Lettuce 流水线 Redis 命令

我使用spring boot webflux+ project reactor+lettuce连接和非阻塞方式查询Redis的。我已经配置了ReactiveRedisTemplateLettuceConnectionFactory。spring 文档指出,使用管道的唯一方法ReactiveRedisTemplate是使用该execute(<RedisCallback>)方法。在 non-reactive 中RedisTemplate,我看到有一种executePipelined(<RedisCallback>)方法可以在执行回调之前打开/关闭管道。但是在ReactiveRedisTemplate.execute方法的情况下,它使用 aLettuceReactiveRedisConnection并且既Spring ReactiveRedisConnection没有Lettuce也没有没有对管道的引用。

所以我的问题是,是否可以在使用Spring ReactiveRedisTemplate+时流水线化您的命令ReactiveLettuceConnection

我也注意到,使用ReactiveRedisTemplate.executeRedisCallback具有多个Redis命令的执行速度比打电话只是单独的命令慢。

带有 ReactiveRedisTemplate 的管道示例代码:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> {
                        try {
                        return mapper.readValue(b, Value.class);
                        } catch (IOException e1) {
                        return null;
                        }
                    })
                    .collectList();
Run Code Online (Sandbox Code Playgroud)

没有管道的代码:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue) …
Run Code Online (Sandbox Code Playgroud)

reactive-programming redis lettuce spring-data-redis project-reactor

13
推荐指数
1
解决办法
1897
查看次数

“文本/事件流”和“应用程序/流+json”有什么区别

@GetMapping(path = "/cars", produces = "text/event-stream")
public Flux<Car> getCarStream() {
    System.out.println("application/stream+json");
    return this.repository.findCarsBy().log();
}
Run Code Online (Sandbox Code Playgroud)

上面的代码和下面的有什么区别:

@GetMapping(path = "/cars", produces = "application/stream+json")
public Flux<Car> getCarStream() {
    System.out.println("application/stream+json");
    return this.repository.findCarsBy().log();
}
Run Code Online (Sandbox Code Playgroud)

到目前为止,我发现了矛盾的信息:有人说它们都表示服务器发送的事件,而其他人则表示存在差异。

java project-reactor spring-webflux

13
推荐指数
1
解决办法
1万
查看次数

如何正确读取Flux <DataBuffer>并将其转换为单个inputStream

我正在为我的spring-boot应用程序使用WebClient和自定义BodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })
Run Code Online (Sandbox Code Playgroud)

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}
Run Code Online (Sandbox Code Playgroud)

上面的代码使用小的有效负载而不是大的有效负载,我认为这是因为我只读取一个通量值,next我不知道如何组合和读取所有dataBuffer.

我是反应堆的新手,所以我不知道很多使用flux/mono的技巧.

java spring project-reactor reactive-streams spring-webflux

12
推荐指数
4
解决办法
1万
查看次数