在Spring中使用什么样的"EventBus"?内置,Reactor,Akka?

Ben*_*n M 40 spring multithreading event-driven-design akka project-reactor

我们将在几周内开始新的Spring 4应用程序.我们想使用一些事件驱动的架构.今年我在这里和那里读到关于"Reactor"的信息,在网上搜索时,我偶然发现了"Akka".

所以现在我们有3个选择:

我无法找到真正的比较.


现在我们只需要:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

然后X,Y将接收并处理该事件.

我们很可能会以异步方式使用它,但肯定会有一些同步方案.我们很可能总是将一个类作为事件发送.(Reactor样本主要使用字符串和字符串模式,但它也支持对象).


据我所知,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作.并且Reactor还允许使用该await()方法使其有点同步.Akka提供或多或少相同Reactor,但也支持Remoting.

关于Reactor的await()方法:它可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们从上面举例:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

可以通过说:等待X Y完成来使其同步.它是否有可能让它等待X,但不是为了Y


也许还有一些替代品?例如JMS呢?

很多问题,但希望你能提供一些答案!

谢谢!


编辑:示例用例

  1. 当特定事件被触发时,我想创建10000封电子邮件.每封电子邮件都必须使用特定于用户的内容生成.所以我创建了很多线程(max = system cpu cores)来创建邮件并且不阻塞调用者线程,因为这可能需要几分钟.

  2. 当特定事件被触发时,我想从未知数量的服务中收集信息.每次获取大约需要100毫秒.在这里我可以想象使用Reactor await,因为我需要这些信息来继续我在主线程中的工作.

  3. 当特定事件被触发时,我想基于应用程序配置执行一些操作.因此,应用程序必须能够动态(un)注册comsumers /事件处理程序.他们会用事件做自己的事情,我不在乎.所以我会为每个处理程序创建一个线程,然后继续在主线程中完成我的工作.

  4. 简单的解耦:我基本上知道所有接收器,但我只是不想在我的代码中调用每个接收器.这应该主要是同步完成的.

听起来像我需要ThreadPool或RingBuffer.这些框架是否具有动态RingBuffers,如果需要,它会增大?

小智 31

我不确定我能否在这个狭小的空间内充分回答你的问题.但我会试一试!:)

就功能而言,Spring的ApplicationEvent系统和Reactor非常不同.ApplicationEvent路由基于由处理的类型ApplicationListener.任何比这更复杂的东西,你必须自己实现逻辑(但这不一定是坏事).但是,Reactor提供了一个全面的路由层,它也非常轻量级且完全可扩展.两者在订阅和发布事件的能力方面的功能相似,这实际上是任何事件驱动系统的一个特征.另外,不要忘记spring-messaging使用Spring 4 的新模块.它是Spring Integration中可用工具的一个子集,还提供了围绕事件驱动架构构建的抽象.

Reactor将帮助您解决一些您必须自己管理的关键问题:

选择器匹配:Reactor进行Selector匹配,包含一系列匹配 - 从简单的.equals(Object other)调用到更复杂的URI模板匹配,允许占位符提取.您还可以使用自己的自定义逻辑扩展内置选择器,以便可以将富对象用作通知键(例如,域对象).

流和承诺API:您Promise已经参考该.await()方法提到了API ,这实际上是针对需要阻塞行为的现有代码.使用Reactor编写新代码时,不能强调使用组合和回调来有效利用系统资源而不阻塞线程.在依赖于少量线程来执行大量任务的体系结构中,阻止调用者几乎不是一个好主意.期货根本不是云可扩展的,这就是现代应用程序利用替代解决方案的原因.

您的应用程序可以使用Streams或Promises构建,但老实说,我认为您会发现它Stream更灵活.关键的好处是API的可组合性,它允许您在依赖链中将操作连接在一起而不会阻塞.作为一个基于您的电子邮件用例的完全袖手旁观的例子,您描述了:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}
Run Code Online (Sandbox Code Playgroud)

Reactor还提供了Boundary,它基本上CountDownLatch用于阻塞任意消费者(因此,Promise如果你想要做的就是阻止Consumer完成,你就不必构造一个).Reactor在这种情况下,您可以使用raw ,并使用on()notify()方法来触发服务状态检查.

然而,对于某些事情,似乎你想要的是Future从a 返回ExecutorService,不是吗?为什么不保持简单?只有在您的吞吐量性能和开销效率很重要的情况下,Reactor才会真正受益.如果您正在阻止调用线程,那么您可能会消除Reactor将为您提供的效率增益,因此在这种情况下使用更传统的工具集可能会更好.

关于Reactor开放性的好处是没有什么可以阻止两者进行交互.您可以随意混合FuturesConsumers不会产生静电.在这种情况下,请记住,您只会像最慢的组件一样快.


Ada*_*ent 9

让我们忽略Spring,ApplicationEvent因为它实际上不是针对您的要求而设计的(更多关于bean生命周期管理).

你需要弄清楚的是你想要做什么

  1. 面向对象的方式(即演员,动态消费者,即时注册)
  2. 服务方式(静态消费者,在启动时注册).

使用您的例子XY他们是:

  1. 短暂的实例(1)或是他们
  2. 长寿的单身人士/服务对象(2)?

如果你需要在飞行中注册消费者,那么Akka是一个不错的选择(我不确定反应堆,因为我从未使用它).如果您不想在短暂的对象中使用JMS或AMQP.

您还需要了解这些库正试图解决两个问题:

  1. 并发(即在同一台机器上并行处理)
  2. 分配(即在多台机器上并行处理)

Reactor和Akka主要关注#1.Akka最近刚刚添加了集群支持,而actor抽象使得#2更容易实现.消息队列(JMS,AMQP)专注于#2.

对于我自己的工作,我做服务路线并使用经过大量修改的Guava EventBus和RabbitMQ.我使用类似的注解番石榴Eventbus但也有发送总线上,但是你可以只使用番石榴的EventBus在异步模式下的POC,然后让你自己像我一样的对象注解.

您可能认为需要拥有动态消费者(1),但大多数问题都可以通过简单的pub/sub来解决.管理动态消费者也很棘手(因此,Akka是一个不错的选择,因为演员模型对此有各种管理)