使用Reactor 2.0在Spring 4上执行多线程执行

jba*_*eta 7 java spring multithreading reactor project-reactor

我正在尝试将Reactor 2x集成到现有Spring 4应用程序中以提高执行REST请求期间的性能,其中resources可以相互独立地获取,有点map-reduce我们将作业并行化为多个线程然后将它们连接到缓冲区.

到目前为止,我们已将此示例在非弹簧环境中工作:

    //Ordered resources to apply transformations.
    List<Map<String, Object>> result;

    result = Streams.from(resources)
                    .flatMap(m -> Streams.just(m) .dispatchOn(Environment.cachedDispatcher())
                    .map(resourceToMapFunction::apply))
                    .buffer().next().await(5, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)

在上面的示例中,我们使用the应用转换resourceToMapFunction然后使用buffer()方法连接,创建Promise等待结果并返回result.

我的第一个问题是,这是Reactor被假设使用的方式吗?我知道转换是正确应用的,但也许,我是一个Reactor不以正确方式使用东西的新手.

我的第二个问题,并不是什么大问题,但Reactor项目中是否有任何东西以resources输入中提供的相同顺序返回?由于这是在多个线程中执行的,我很确定答案不是,就像我说的那样是我的担忧,但是无论如何都想问.

最后一个问题,当我将此代码引入我的Spring项目时,转换失败,因为Bean应用底层转换的依赖关系不在执行线程中,这是我可以使用Spring Reactor版本轻松完成的事情吗?如果是,是否有任何链接或文档显示如何操作?

非常感谢!

何塞路易斯

ZZ *_*Z 5 1

您应该使用 Reactor 的 API。因此,Stream您应该创建 Reactor,而不是创建 Java Flux。那么你应该使用flatMapa 的函数Flux来获取一些资源。如果您希望资源的顺序与请求的顺序相同,则可以使用flatMapSequential.

所以代码看起来像:

        Flux<Map<String,Object>> result = Flux.fromIterable(resources)
                .flatMapSequential(resourceToMapFunction::apply)
                .take(Duration.ofSeconds(5));
Run Code Online (Sandbox Code Playgroud)

然后您就有了一个 Reactor 流,您可以对该流应用更多操作。如果您想要简单的列表,那么您必须使用运算符来阻止collectList。如果您想控制执行给定函数的线程,那么您必须熟悉Scheduler.