标签: reactor

Akka或Reactor

我正在开始一个新项目(基于java).我需要将其构建为模块化,分布式和弹性架构.

因此,我希望业务流程能够相互通信,具有互操作性,但也是独立的.

我现在正在寻找两个框架,除了它们的年龄差异外,还表达了两种不同的观点:

选择上述框架之一时我应该考虑什么?

据我所知,到目前为止,Akka仍然以某种方式耦合(在某种程度上我必须'选择'我要发送消息的演员),但非常有弹性.虽然Reactor是松散的(基于事件发布).

有人可以帮我理解如何做出正确的决定吗?

UPDATE

在更好地回顾了Akka 的事件总线之后,我相信Reactor表达功能已经在某种程度上已经包含在Akka中.

例如,https://github.com/reactor/reactor#events-selectors-and-consumers上记录的订阅和事件发布可以在Akka中表示如下:

final ActorSystem system = ActorSystem.create("system");
final ActorRef actor = system.actorOf(new Props(
    new UntypedActorFactory() {

        @Override
        public Actor create() throws Exception {

            return new UntypedActor() {
                final LoggingAdapter log = Logging.getLogger(
                        getContext().system(), this);

                @Override
                public void onReceive(Object message)
                        throws Exception {
                    if (message instanceof String)
                        log.info("Received String message: {}",
                                message);
                    else
                        unhandled(message);
                }
            };
        }
    }), "actor"); …
Run Code Online (Sandbox Code Playgroud)

java spring reactor akka project-reactor

90
推荐指数
3
解决办法
3万
查看次数

Netflix RxJava与Spring Reactor

我正在评估反应堆库,以便在我们的项目中使用它.我用Google搜索了很多,但找不到Netflix的RxJava和Spring的反应器API之间的差异(优点/缺点).有人可以帮我决定或提供一些指示吗?

谢谢.

reactor

39
推荐指数
1
解决办法
1万
查看次数

什么是工作线程,它们在反应堆模式中的作用是什么?

我正试图进行Reactor模式(并发),但在很多例子中他们都在讨论'工作线程'.什么是工作线程?它们与"正常"线程的区别是什么?他们在反应堆模式中的作用是什么?

java multithreading design-patterns reactor

24
推荐指数
2
解决办法
2万
查看次数

用Reactor抛出异常的正确方法

我是新手,一般都会对Reactor和反应式编程进行预测.

我目前正在编写一段类似于此的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings
Run Code Online (Sandbox Code Playgroud)

这个例子可能很愚蠢,实现这种情况肯定有更好的方法,但重点是:

throw newmap块中使用异常是否错误,或者我应该用return Mono.error(new UserNotFoundException())?替换它?

这两种做法有什么实际区别吗?

java reactor reactive-programming project-reactor

23
推荐指数
1
解决办法
8638
查看次数

Mono<Void> 和 Mono.empty() 有何不同

据我了解,在 Spring WebFlux 反应器中

Mono<Void>指的是 void Mono

Mono.empty()指的是 void,因为在此之上调用任何内容都会给出空指针。

它们的用法有何不同?

java spring reactor reactive-programming spring-webflux

23
推荐指数
1
解决办法
3万
查看次数

Python的Twisted Reactor如何工作?

最近,我一直在深入研究Twisted文档.从我收集的内容来看,Twisted功能的基础是它的事件循环称为"Reactor".reactor侦听某些事件并将它们分派给已设计用于处理这些事件的已注册回调函数.在书中,有一些伪代码描述了Reactor的功能,但我无法理解它,它对我没有任何意义.

 while True:
     timeout = time_until_next_timed_event()
     events = wait_for_events(timeout)
     events += timed_events_until(now())
     for event in events:
         event.process()
Run Code Online (Sandbox Code Playgroud)

这是什么意思?

python twisted event-loop reactor

20
推荐指数
1
解决办法
7002
查看次数

一个应用中的多个反应器(主回路)通过螺纹(或替代方式)

我已经知道了一个应用程序,我想继续学习更多关于TwistedWebSockets的知识.我正在考虑将以前编写的IRC Bot集成到Web应用程序中.据我所知,我需要三个反应器来使它工作:

  • 主反应堆:Web服务器(HTTP).这将是你的平均twisted.web应用程序.当您访问它时,您可以将IRC服务器/通道POST到连接.然后,Web服务器将与不同线程中的不同反应器通信,这是...
  • 二级反应堆:IRC Bot.这将是通过Twisted IRC客户端协议运行的IRC机器人.它会加入一个频道,每当说出某些内容时,它会将这些数据推送到另一个反应堆,在另一个线程上,这是......
  • 第三级反应堆:WebSocket服务器(WS):由于WebSockets不使用常规HTTP协议,它们需要自己的服务器(或者看起来像这样的例子.当IRC机器人收到消息时,它告诉WebSocket服务器将该消息推送到连接的客户端.

在我看来,这是有道理的.似乎有可能.有没有人有任何多个反应器在不同的线程中运行的例子,或者这是我想象的那些在扭曲的当前版本中无法完成的事情.

是否可以(或应该)进行任何架构更改以最小化反应器数量等?

谢谢你的帮助.

python events multithreading twisted reactor

19
推荐指数
2
解决办法
8139
查看次数

在某种条件下停止扭曲的反应堆

有没有办法在达到某种条件时停止扭曲的反应堆.例如,如果变量设置为某个值,那么反应堆应该停止吗?

twisted reactor

18
推荐指数
2
解决办法
2万
查看次数

Spring 5 webflux如何在Webclient上设置超时

我正在尝试在我的WebClient上设置超时,这是当前代码:

SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

ClientHttpConnector httpConnector = new ReactorClientHttpConnector(opt -> {
    opt.sslContext(sslContext);
    HttpClientOptions option = HttpClientOptions.builder().build();
    opt.from(option);
});
return WebClient.builder().clientConnector(httpConnector).defaultHeader("Authorization", xxxx)
                .baseUrl(this.opusConfig.getBaseURL()).build();
Run Code Online (Sandbox Code Playgroud)

我需要添加超时和汇集策略,我正在考虑这样的事情:

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(this.applicationConfig.getHttpClientMaxPoolSize());
cm.setDefaultMaxPerRoute(this.applicationConfig.getHttpClientMaxPoolSize());
cm.closeIdleConnections(this.applicationConfig.getServerIdleTimeout(), TimeUnit.MILLISECONDS);

RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(this.applicationConfig.getHttpClientSocketTimeout())
        .setConnectTimeout(this.applicationConfig.getHttpClientConnectTimeout())
        .setConnectionRequestTimeout(this.applicationConfig.getHttpClientRequestTimeout()).build();

CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).setConnectionManager(cm).build();
Run Code Online (Sandbox Code Playgroud)

但我无法弄清楚如何在我的webclient中设置httpClient

spring reactor reactor-netty spring-webflux

15
推荐指数
4
解决办法
1万
查看次数

如何在基于 Spring 的反应式应用程序中从身份验证中排除路径?

在非反应式 spring 应用程序中,我通常会创建一个配置类,扩展WebSecurityConfigurerAdapter和配置WebSecurity如下:

@Override
public void configure(WebSecurity web) throws Exception {
    web.ignoring().antMatchers("/pathToIgnore");
}
Run Code Online (Sandbox Code Playgroud)

我怎样才能在反应式应用程序中做同样的事情?

security spring reactor

11
推荐指数
1
解决办法
6174
查看次数