如何在没有Akka的情况下实现actor模型?

dk1*_*k14 9 concurrency scala actor akka

没有Akka如何实现简单的演员?对于许多(非固定计数)actor实例,绿线程,IoC(生命周期,基于Props的工厂,ActorRef),监督,背压等,我不需要高性能.只需要顺序性(队列)+处理程序+状态+消息传递.

作为副作用,我实际上需要基于小型演员的管道(带递归链接)+一些并行演员来优化DSP算法计算.它将在没有传递依赖的库内部,所以我不希望(并且不能因为它是一个jar-plugin)推动用户创建和传递akkaSystem,库应该具有尽可能简单和轻量级的接口.我不需要IoC,因为它只是一个库(一组函数),而不是一个框架 - 因此它比结构更具算法复杂性.但是,我认为actor是描述协议的好工具,我实际上可以将算法分解为少量的异步交互实体,因此它符合我的需要.

为什么不Akka

Akka很重,这意味着:

  • 这是外在的依赖;
  • 具有复杂的界面和实现;
  • 例如,库的用户不透明 - 所有实例都由akka的IoC管理,因此不能保证一个逻辑actor始终由同一个实例维护,重启将创建一个新的;
  • 需要额外的迁移支持,这与scala的迁移支持本身相当.
  • 使用jstack/ jconsole/ 调试akka的绿色线程也可能更难jvisualvm,因为一个actor可能会在任何线程上执行操作.

当然,Akka的jar(1.9Mb)和内存消耗(每GB 250万个演员)根本不重,所以你甚至可以在Android上运行它.但是也知道你应该使用专门的工具来观察和分析演员(比如Typesafe Activator/Console),用户可能不熟悉(我不会让他们学习它).对企业项目来说一切都很好,因为它几乎总是有IoC,一些专门的工具和连续的迁移,但对于一个简单的库来说这不是一个好方法.

PS关于依赖项.我没有它们,我不想添加任何(我甚至避免使用scalaz,这实际上适合这里一点),因为它会导致大量维护 - 我将不得不保留我的简单库与Akka保持同步.

And*_*yuk 9

这里是JVM世界中最简单高效的演员,其基于来自Viktor Klang的极简主义Scala演员的API:https: //github.com/plokhotnyuk/actors/blob/41eea0277530f86e4f9557b451c7e34345557ce3/src/test/scala/com/github/gist/ viktorklang/Actor.scala

它在使用中非常方便且安全,但在消息接收方面不是类型安全的,并且不能在进程或主机之间发送消息.

主要特点:

有状态计数器的示例:

  def process(self: Address, msg: Any, state: Int): Effect = if (state > 0) { 
     println(msg + " " + state)
     self ! msg
     Become { msg => 
        process(self, msg, state - 1)
     }
  } else Die

  val actor = Actor(self => msg => process(self, msg, 5))
Run Code Online (Sandbox Code Playgroud)

结果:

scala> actor ! "a"
a 5

scala> a 4
a 3
a 2
a 1
Run Code Online (Sandbox Code Playgroud)


dk1*_*k14 5

这将使用FixedThreadPool(及其内部任务队列):

import scala.concurrent._

trait Actor[T] {
  implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
  def receive: T => Unit
  def !(m: T) = Future { receive(m) }
}
Run Code Online (Sandbox Code Playgroud)

大小为 1 的固定线程池保证了此处的顺序性。当然,如果您需要 100500 个动态创建的 actor,那么这不是管理线程的最佳方法,但如果您需要每个应用程序有固定数量的 actor 来实现您的协议,那么它就可以了。

用法:

class Ping(pong: => Actor[Int])  extends Actor[Int] {     
      def receive = {
          case m: Int => 
             println(m)
             if (m > 0) pong ! (m - 1)
      }    
}

object System { 
      lazy val ping: Actor[Int] = new Ping(pong) //be careful with lazy vals mutual links between different systems (objects); that's why people prefer ActorRef
      lazy val pong: Actor[Int] = new Ping(ping)
}

System.ping ! 5
Run Code Online (Sandbox Code Playgroud)

结果:

import scala.concurrent._
defined trait Actor
defined class Ping
defined object System
res17: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@6be61f2c
5
4
3
2
1
0

scala> System.ping ! 5; System.ping ! 7
5
7
4
6
3
5
2
res19: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@54b053b1
4
1
3
0
2
1
0
Run Code Online (Sandbox Code Playgroud)

此实现使用两个 Java 线程,因此它比没有并行化的计数快“两倍”。