Akka事件总线教程

Tsu*_*ume 16 scala akka event-bus

关于如何在akka中使用事件总线,有没有很好的教程/解释?我已经阅读了Akka文档,但我发现很难理解如何使用事件总线

cmb*_*ter 42

不确定是否有任何好的教程,但我可以给你一个可能的用户案例的快速示例,其中使用事件流可能会有所帮助.但是,在较高级别,事件流是满足应用程序可能具有的发布/订阅类型要求的良好机制.假设您有一个用例,用于更新系统中用户的余额.经常访问余额,因此您决定对其进行缓存以获得更好的性能.更新余额时,您还需要检查并查看用户是否超过其余额的阈值,如果是,请通过电子邮件发送.您不希望缓存丢弃或余额阈值检查直接绑定到主余额更新调用,因为它们可能会很重,并且会降低用户的响应速度.您可以像这样建模特定的一组要求:

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,AccountCacherand和LowBalanceChecker两者都订阅eventStreamBalanceUpdated事件的类类型.如果此事件是发布到流的事件,则它们将由这两个actor实例接收.然后,在AccountManager平衡更新成功时,它会BalanceUpdated为用户引发一个事件.当发生这种情况时,并行地将该消息传递到邮箱,AccountCacher并且LowBalanceChecker导致从缓存中删除余额并检查帐户阈值并且可能发送电子邮件.

现在,您可以直接将直接tell (!)调用AccountManager与其他两个演员直接通信,但有人可能认为可能过于紧密地耦合平衡更新的这两个"副作用",并且这些类型的细节不会必然属于AccountManager.如果你的条件可能导致一些额外的事情(检查,更新等),这些事情纯粹是副作用(不是核心业务流本身的一部分),那么事件流可能是一个好方法解除正在筹集的事件以及可能需要对该事件作出反应的人.


sou*_*ica 11

EventBus每个人都有一个存在ActorSystem.这EventBus被称为事件流,可以通过调用获得system.eventStream.

ActorSystem使用事件流进行许多操作,包括记录,发送死信集群事件.

您还可以将事件流用于您自己的发布/订阅要求.例如,事件流在测试期间可能很有用.订阅该检测试剂盒testActor以事件流的特定事件(如记录事件),你可以expect他们.当某些事情发生时您不会向另一个演员发送消息但您仍然需要在测试中期望该事件时,这可能特别有用.

请注意,事件流仅在一个内部工作ActorSystem.如果您正在使用流上发布的远程事件,则默认情况下不会跨越到远程系统(尽管您可以自己添加该支持).

理论上,EventBus如果您不想使用事件流,您可以创建一个单独的.

正在为Akka 2.2开发更好的事件总线文档,因此在此票证完成后再次检查.