标签: project-reactor

如何正确读取Flux <DataBuffer>并将其转换为单个inputStream

我正在为我的spring-boot应用程序使用WebClient和自定义BodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })
Run Code Online (Sandbox Code Playgroud)

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}
Run Code Online (Sandbox Code Playgroud)

上面的代码使用小的有效负载而不是大的有效负载,我认为这是因为我只读取一个通量值,next我不知道如何组合和读取所有dataBuffer.

我是反应堆的新手,所以我不知道很多使用flux/mono的技巧.

java spring project-reactor reactive-streams spring-webflux

12
推荐指数
4
解决办法
1万
查看次数

Flux.create和Flux.generate之间的差异

Flux.create和之间有什么区别Flux.generate?我正在寻找 - 理想情况下使用示例用例 - 来了解何时应该使用其中一个.

project-reactor

12
推荐指数
2
解决办法
4637
查看次数

Mono 的 doOnSuccess 与 doOnNext 之间有区别吗?

假设您有Mono<Integer> someIntegerSource = Mono.just(5)并且想要将其分配给一个变量。

这些代码片段之间有区别吗?

案例 1:doOnSuccess

someIntegerSource.doOnSuccess(number -> this.myNumber = number)
Run Code Online (Sandbox Code Playgroud)

案例2:doOnNext

someIntegerSource.doOnNext(number -> this.myNumber = number)
Run Code Online (Sandbox Code Playgroud)

案例 3:doOnSuccess + then(因为我想在发出单声道完成之前完成分配)

someIntegerSource.doOnSuccess(number -> this.myNumber = number).then()
Run Code Online (Sandbox Code Playgroud)

java spring java-8 project-reactor spring-webflux

12
推荐指数
1
解决办法
1万
查看次数

如何在Spring Webflux/WebClient中设置事件循环池大小?

在Vert.X等多反应堆框架中,我们可以设置事件循环线程的数量,例如:

final VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setEventLoopPoolSize(16);
final Vertx myVertx = Vertx.vertx(vertxOptions);
Run Code Online (Sandbox Code Playgroud)

如何在Spring Boot 2 WebFlux/WebClient中进行等效操作?

java spring spring-boot project-reactor spring-webflux

11
推荐指数
1
解决办法
3361
查看次数

当 Flux 从 Spring Web 控制器返回时会发生什么?

我对反应式 API 比较陌生,并且很好奇当我们从 Web 控制器返回 Flux 时幕后发生了什么。

根据 spring-web 文档

响应式返回值的处理方式如下:

适应单值 Promise,类似于使用 DeferredResult。示例包括 Mono (Reactor) 或 Single (RxJava)。

适配流媒体类型(如application/stream+json或text/event-stream)的多值流,类似于使用ResponseBodyEmitter或SseEmitter。示例包括 Flux (Reactor) 或 Observable (RxJava)。应用程序还可以返回 Flux 或 Observable。

与任何其他媒体类型(例如 application/json)的多值流进行适配,类似于使用DeferredResult<List<?>>.

我创建了两个 API,如下所示:

@GetMapping("/async-deferredresult")
public DeferredResult<List<String>> handleReqDefResult(Model model) {
    LOGGER.info("Received async-deferredresult request");
    DeferredResult<List<String>> output = new DeferredResult<>();

    ForkJoinPool.commonPool().submit(() -> {
        LOGGER.info("Processing in separate thread");
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 10000   ; i++) {
            list.add(String.valueOf(i));
        }
        output.setResult(list);
    });

    LOGGER.info("servlet thread freed");
    return output; …
Run Code Online (Sandbox Code Playgroud)

project-reactor spring-webflux

11
推荐指数
1
解决办法
7124
查看次数

谁在响应式 Web 应用程序中调用 Flux 或 Mono 上的订阅

我正在查看一些响应式 Web 应用程序的示例,我看到它们是这样的

@RequestMapping(value = "/{id}", method = RequestMethod.GET)
@ResponseBody    
public Mono<Person> findById(...) {
    return exampleService.findById(...);
}

@RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Person> findAll() {
    Flux<Person> persons = exampleService.findAll();
    return persons;
}
Run Code Online (Sandbox Code Playgroud)

当我在文档中阅读有关 Mono 和 Flux 时,它提到必须为 Mono 或 Flux 调用subscribe以发出数据。

因此,当我在本地运行这些反应式 Web 应用程序并在我点击端点时使用邮递员/chrome 浏览器时,我得到了结果。

在服务端,虽然端点返回 Mono 或 Flux,但我如何在浏览器/邮递员中看到实际结果。每当我点击返回 Mono/Flux 类型的端点时,浏览器是否会在内部调用订阅

reactive-programming project-reactor spring-webflux

11
推荐指数
1
解决办法
3264
查看次数

在 Spring Webflux 响应式项目中何时使用和不使用 @Mock 注解、@MockBean 注解、@InjectMock 注解和 @Autowired 注解

您能否解释一下何时使用以下注释以及何时不使用这些注释。我对测试框架还很陌生,并且对网络上的所有答案感到困惑。

@Mock
private Resource resource;
@MockBean
private Resource resource;
@InjectMock
private ProductService productService; 
@AutoWired
Private ProductRepository productRepo;
Run Code Online (Sandbox Code Playgroud)

java spring springmockito project-reactor spring-webflux

11
推荐指数
1
解决办法
4804
查看次数

如何从ActiveMQ队列创建Spring Reactor Flux?

我正在试验Spring Reactor 3组件和Spring Integration来从JMS队列创建一个反应流(Flux).

我试图从JMS队列(使用Spring Integration的ActiveMQ)创建一个反应流(Spring Reactor 3 Flux),以便客户端异步获取JMS消息.我相信我已正确连接所有内容但客户端在服务器停止之前不会收到任何JMS消息.然后将所有消息"推送"到客户端一次.

任何帮助,将不胜感激.

这是我用来配置JMS,Integration组件和被动发布者的配置文件:

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() …
Run Code Online (Sandbox Code Playgroud)

java spring spring-integration reactive-programming project-reactor

10
推荐指数
1
解决办法
5592
查看次数

在Flux和Mono中compose()与transform()vs. as()vs. map()

最近,我决定尝试使用projectreactor.io的春天5 (io.projectreactor:reactor-test:jar:3.1.1).

有谁知道使用这个功能的最佳情况是什么?使用它们以及应该在哪里使用它们有什么缺点和优点?

好的例子会有所帮助.

reactive-programming project-reactor reactive-streams

10
推荐指数
2
解决办法
5485
查看次数

Webflux websocketclient,如何在同一会话中发送多个请求[设计客户端库]

TL; DR;

我们正在尝试使用spring webflux WebSocket实现设计WebSocket服务器.服务器具有通常的HTTP服务器操作,例如create/fetch/update/fetchall.使用WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为WebSockets就是为此目的而设计的.它是一个使用webflux和WebSockets的正确设计吗?

长版

我们正在开始一个将使用反应式网络套接字的项目spring-webflux.我们需要构建一个反应式客户端库,消费者可以使用它来连接到服务器.

在服务器上,我们收到请求,读取消息,保存并返回静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}
Run Code Online (Sandbox Code Playgroud)

在客户端,当有人调用save方法并从中返回响应时,我们想要打电话server.

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}
Run Code Online (Sandbox Code Playgroud)

我们不确定,如何设计这个.理想情况下,我们认为应该有

1)client.execute应该只调用一次并以某种方式持有session.应使用相同的会话在后续调用中发送数据.

2)如何从我们进入的服务器返回响应session.receive? …

java spring spring-websocket project-reactor spring-webflux

10
推荐指数
1
解决办法
1543
查看次数