标签: project-reactor

Akka或Reactor

我正在开始一个新项目(基于java).我需要将其构建为模块化,分布式和弹性架构.

因此,我希望业务流程能够相互通信,具有互操作性,但也是独立的.

我现在正在寻找两个框架,除了它们的年龄差异外,还表达了两种不同的观点:

选择上述框架之一时我应该考虑什么?

据我所知,到目前为止,Akka仍然以某种方式耦合(在某种程度上我必须'选择'我要发送消息的演员),但非常有弹性.虽然Reactor是松散的(基于事件发布).

有人可以帮我理解如何做出正确的决定吗?

UPDATE

在更好地回顾了Akka 的事件总线之后,我相信Reactor表达功能已经在某种程度上已经包含在Akka中.

例如,https://github.com/reactor/reactor#events-selectors-and-consumers上记录的订阅和事件发布可以在Akka中表示如下:

final ActorSystem system = ActorSystem.create("system");
final ActorRef actor = system.actorOf(new Props(
    new UntypedActorFactory() {

        @Override
        public Actor create() throws Exception {

            return new UntypedActor() {
                final LoggingAdapter log = Logging.getLogger(
                        getContext().system(), this);

                @Override
                public void onReceive(Object message)
                        throws Exception {
                    if (message instanceof String)
                        log.info("Received String message: {}",
                                message);
                    else
                        unhandled(message);
                }
            };
        }
    }), "actor"); …
Run Code Online (Sandbox Code Playgroud)

java spring reactor akka project-reactor

90
推荐指数
3
解决办法
3万
查看次数

在Spring中使用什么样的"EventBus"?内置,Reactor,Akka?

我们将在几周内开始新的Spring 4应用程序.我们想使用一些事件驱动的架构.今年我在这里和那里读到关于"Reactor"的信息,在网上搜索时,我偶然发现了"Akka".

所以现在我们有3个选择:

我无法找到真正的比较.


现在我们只需要:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

然后X,Y将接收并处理该事件.

我们很可能会以异步方式使用它,但肯定会有一些同步方案.我们很可能总是将一个类作为事件发送.(Reactor样本主要使用字符串和字符串模式,但它也支持对象).


据我所知,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作.并且Reactor还允许使用该await()方法使其有点同步.Akka提供或多或少相同Reactor,但也支持Remoting.

关于Reactor的await()方法:它可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们从上面举例:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

可以通过说:等待X Y完成来使其同步.它是否有可能让它等待X,但不是为了Y


也许还有一些替代品?例如JMS呢?

很多问题,但希望你能提供一些答案!

谢谢!


编辑:示例用例 …

spring multithreading event-driven-design akka project-reactor

40
推荐指数
2
解决办法
2万
查看次数

Mono vs Flux in Reactive Stream

根据文件:

Flux是一个可以发出0..N元素的流:

Flux<String> fl = Flux.just("a", "b", "c");
Run Code Online (Sandbox Code Playgroud)

Mono是一个0..1元素的流:

Mono<String> mn = Mono.just("hello");
Run Code Online (Sandbox Code Playgroud)

因为两者都是反应流中Publisher接口的实现.

我们不能在大多数情况下只使用Flux,因为它也可以发出0..1,从而满足Mono的条件?

或者只有一些特定的条件,只需要使用Mono并且Flux无法处理操作?请建议.

reactive-programming project-reactor reactive-streams

30
推荐指数
3
解决办法
2万
查看次数

Java反应框架的比较

我看到许多框架/库声称他们可以帮助用Java构建反应式应用程序,例如:Akka,Vert.x,RxJava,Reactor,QBit等.

他们似乎有不同的方法,功能,优点,缺点等.我找不到详细的比较.有关于这些框架中的每一个的文档,但我不能理解这些差异.

主要Java反应框架之间有什么区别?

什么是可以推动Java反应式框架选择的应用程序需求?

感谢您的时间.

java reactive-programming akka rx-java project-reactor

29
推荐指数
1
解决办法
1万
查看次数

在项目反应堆或akka流中,接收器和用户之间的概念差异是什么?

接收器和用户的概念看起来与我类似.此外,我没有看到在反应流规范中明确定义接收器的概念.

project-reactor reactive-streams akka-stream

29
推荐指数
1
解决办法
3267
查看次数

Spring WebFlux和Reactor的线程模型

目前正在尝试使用Spring 5.0.0.RC2,Reactor 3.1.0.M2Spring Boot 2.0.0.M2进行反应式编程.

想知道WebFlux和Reactor使用的并发和线程模型来正确编写应用程序并处理可变状态.

Reactor doc声明该库被认为是并发不可知的,并提到了Scheduler抽象.WebFlux文档不提供信息.

然而,当通过Spring Boot使用WebFlux时,定义了一个线程模型.

从我的实验中得到的是:

  • 该模型既不是1个事件线程,也不是1个事件线程+工作者
  • 使用了几个线程池
  • " reactor-http-nio-3 "线程:可能每个核心一个,处理传入的HTTP请求
  • " Thread-7 "线程:由对MongoDB或HTTP资源的异步请求使用
  • " parallel-1 "线程:每个核心一个,由Reactor的Schedulers.parallel()创建,由延迟运算符等使用
  • 共享可变状态必须由应用程序同步
  • ThreadLocal(用于应用程序状态,MDC日志记录等)不是请求作用域,因此不是很有趣

它是否正确 ?什么是WebFlux的并发和线程模型:例如,什么是默认线程池?

感谢您的信息

java multithreading reactive-programming project-reactor spring-webflux

23
推荐指数
1
解决办法
9518
查看次数

用Reactor抛出异常的正确方法

我是新手,一般都会对Reactor和反应式编程进行预测.

我目前正在编写一段类似于此的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings
Run Code Online (Sandbox Code Playgroud)

这个例子可能很愚蠢,实现这种情况肯定有更好的方法,但重点是:

throw newmap块中使用异常是否错误,或者我应该用return Mono.error(new UserNotFoundException())?替换它?

这两种做法有什么实际区别吗?

java reactor reactive-programming project-reactor

23
推荐指数
1
解决办法
8638
查看次数

如何处理 Reactive Spring webflux 中的错误

由于有很多方法onErrorReturn,例如onErrorResume等,那么哪一个是正确的方法来处理 Reactive Spring WebFlux 中单声道和通量的错误?

java project-reactor spring-webflux

23
推荐指数
1
解决办法
3万
查看次数

如何在执行Flux.map()时处理错误

我试图找出在Flux中映射元素时如何处理错误.

例如,我将CSV字符串解析为我的一个业务POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Run Code Online (Sandbox Code Playgroud)

其中一些行可能包含错误,因此我在日志中得到的是:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
Run Code Online (Sandbox Code Playgroud)

我在API中读到了一些错误处理方法,但大多数都提到返回"错误值"或使用回退Flux,如下所示:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Run Code Online (Sandbox Code Playgroud)

然而,用我的myflux意思是再次处理整个通量.

那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?

使用@akarnokd解决方法进行更新

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

22
推荐指数
3
解决办法
1万
查看次数

反应式编程优点/缺点

我一直在研究并尝试使用Reactor和RxJava进行编码的Reactive Style.我确实理解,与单线程执行相比,反应式编码可以更好地利用CPU.

在基于Web的应用程序中,反应式编程与命令式编程之间是否有任何具体比较?

通过对非反应式编程使用反应式编程,我实现了多少性能提升和吞吐量?

还原反应编程有哪些优缺点?

有没有统计基准?

java reactive-programming project-reactor microservices rx-java2

21
推荐指数
3
解决办法
1万
查看次数