标签: project-reactor

如何从ActiveMQ队列创建Spring Reactor Flux?

我正在试验Spring Reactor 3组件和Spring Integration来从JMS队列创建一个反应流(Flux).

我试图从JMS队列(使用Spring Integration的ActiveMQ)创建一个反应流(Spring Reactor 3 Flux),以便客户端异步获取JMS消息.我相信我已正确连接所有内容但客户端在服务器停止之前不会收到任何JMS消息.然后将所有消息"推送"到客户端一次.

任何帮助,将不胜感激.

这是我用来配置JMS,Integration组件和被动发布者的配置文件:

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() …
Run Code Online (Sandbox Code Playgroud)

java spring spring-integration reactive-programming project-reactor

10
推荐指数
1
解决办法
5592
查看次数

如何对 StepVerifier 中的所有剩余元素执行断言?

StepVerifier 有一个assertNext方法允许对下一个元素的值执行断言。

    StepVerifier.create(dataFlux)
            .assertNext(v -> checkValue(v)
            .verifyComplete();
Run Code Online (Sandbox Code Playgroud)

对每个剩余元素执行断言的好方法是什么(例如检查每个元素是否为正)?我希望有类似assertEveryNextElement方法的东西。

java unit-testing project-reactor

10
推荐指数
2
解决办法
7528
查看次数

bean无法作为'Type'注入,因为它是一个JDK动态代理,它实现了:reactor.fn.Consumer

使用Reactor 2的My Spring 4应用程序无法启动:

***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'orderHandlerConsumer' could not be injected as a 'fm.data.repository.OrderHandlerConsumer' because it is a JDK dynamic proxy that implements:
    reactor.fn.Consumer


Action:

Consider injecting the bean as one of its interfaces or forcing the use of CGLib-based proxies by setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
Run Code Online (Sandbox Code Playgroud)

OrderHandlerConsumer很简单:

@Service
@Order(Ordered.HIGHEST_PRECEDENCE)
public class OrderHandlerConsumer implements Consumer<Event<OrderEnvelope>> {
    @Override
    public void accept(Event<OrderEnvelope> event) {
        event.getData().getLatch().countDown();
    }
}
Run Code Online (Sandbox Code Playgroud)

什么想法可能会出错?

java spring project-reactor

10
推荐指数
2
解决办法
7910
查看次数

在Flux和Mono中compose()与transform()vs. as()vs. map()

最近,我决定尝试使用projectreactor.io的春天5 (io.projectreactor:reactor-test:jar:3.1.1).

有谁知道使用这个功能的最佳情况是什么?使用它们以及应该在哪里使用它们有什么缺点和优点?

好的例子会有所帮助.

reactive-programming project-reactor reactive-streams

10
推荐指数
2
解决办法
5485
查看次数

是否可以并行启动Mono并汇总结果

我知道可以链接Mono的,例如......

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());
Run Code Online (Sandbox Code Playgroud)

这将链和resultBMono将在resultAMono返回时运行....

所以我的问题是,是否有可能并行启动2个单声道并且当两个单声道继续使用另一个单声道时?

我认为它看起来像这样......

Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);
Run Code Online (Sandbox Code Playgroud)

但是我不知道这将并行运行或者我可以做什么来并行运行...

谢谢答案....

project-reactor spring-webflux

10
推荐指数
1
解决办法
4837
查看次数

如何使用Reactor的StepVerifier来验证Mono是否为空?

StepVerifier用来测试值:

@Test
public void testStuff() {
    Thing thing = new Thing();
    Mono<Thing> result = Mono.just(thing);
    StepVerifier.create(result).consumeNextWith(r -> {
        assertEquals(thing, r);
    }).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)

我现在要做的是测试单声道中没有项目.像这样:

@Test
public void testNoStuff() {
    Mono<Thing> result = Mono.empty();
    StepVerifier.create(result)... // what goes here?
}
Run Code Online (Sandbox Code Playgroud)

我想测试Mono实际上是空的.我怎么做?

java unit-testing reactive-programming project-reactor

10
推荐指数
2
解决办法
3123
查看次数

在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 的结果

我正在寻找缓存Mono(仅当它成功时)这是 WebClient 调用的结果。

通过阅读项目反应器插件文档,我不觉得CacheMono很合适,因为它也缓存了我不想要的错误。

因此,CacheMono我没有使用,而是执行以下操作:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));
Run Code Online (Sandbox Code Playgroud)

在这里,我调用缓存Mono,然后将其添加到咖啡因缓存中。

任何错误都会输入doOnError以使缓存无效。

这是缓存MonoWebClient 响应的有效方法吗?

project-reactor spring-webflux

10
推荐指数
2
解决办法
8772
查看次数

何时使用 Mono&lt;List&lt;Object&gt;&gt; 以及何时使用 Flux&lt;Object&gt; 作为 RestController 方法

我正在将 Spring web-flux 与 Reactor 一起使用,但我不清楚 RestController 方法何时应该返回

Mono <List<Object>>Flux<Object>.

你能提供一些什么时候使用它们的例子吗?

spring spring-boot project-reactor reactor-netty spring-webflux

10
推荐指数
1
解决办法
4605
查看次数

Webflux websocketclient,如何在同一会话中发送多个请求[设计客户端库]

TL; DR;

我们正在尝试使用spring webflux WebSocket实现设计WebSocket服务器.服务器具有通常的HTTP服务器操作,例如create/fetch/update/fetchall.使用WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为WebSockets就是为此目的而设计的.它是一个使用webflux和WebSockets的正确设计吗?

长版

我们正在开始一个将使用反应式网络套接字的项目spring-webflux.我们需要构建一个反应式客户端库,消费者可以使用它来连接到服务器.

在服务器上,我们收到请求,读取消息,保存并返回静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}
Run Code Online (Sandbox Code Playgroud)

在客户端,当有人调用save方法并从中返回响应时,我们想要打电话server.

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}
Run Code Online (Sandbox Code Playgroud)

我们不确定,如何设计这个.理想情况下,我们认为应该有

1)client.execute应该只调用一次并以某种方式持有session.应使用相同的会话在后续调用中发送数据.

2)如何从我们进入的服务器返回响应session.receive? …

java spring spring-websocket project-reactor spring-webflux

10
推荐指数
1
解决办法
1543
查看次数

Mono vs CompletableFuture

CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数.假设我有一个API调用CompletableFuture.那是一个API调用阻塞吗?线程是否会被阻塞,直到它没有得到API的响应?(我知道主线程/ tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)

据我所知,单声道完全没有阻挡.

如果我错了,请详细说明并纠正我.

java reactive-programming project-reactor completable-future

10
推荐指数
2
解决办法
1735
查看次数