实现弱引用的Eventbus演员?

Jan*_*rts 6 scala akka

说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的......

我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点的正确时机.

Akka是否提供此类用例?

val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])

// an event is published & received
as.eventStream.publish(new KnownProblem)

//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)

// an event is published & STILL received
as.eventStream.publish(new KnownProblem)
Run Code Online (Sandbox Code Playgroud)

ido*_*nie 0

好吧,我实际上无法实现它,但 actor 正在停止 GC。使用 Scala 2.9.2 (REPL) + Akka 2.0.3。

\n\n

with没有帮助 - 因为在 Akka 中你也有一个EventBuswith ( ) ,也可以订阅生命周期事件。我没有尝试过的事情 - 使用只知道我们新闪亮的调度程序创建演员- 所以也许我错过了重点?WeakReference[ActorRef]dungeonChildrenContainerself.childrenMonitorWeakEventBus

\n\n

这是 REPL 的代码(通过适当的导入启动它,然后:paste分两步完成):

\n\n
// Start REPL with something like:\n// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:\n// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:\n// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:\n// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:\n// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"\n\n// :paste 1/2\nimport akka.actor._\nimport akka.pattern._\nimport akka.event._\nimport akka.util._\nimport com.typesafe.config.ConfigFactory\nimport akka.util.Timeout\nimport akka.dispatch.Await\nimport scala.ref.WeakReference\nimport java.util.Comparator\nimport java.util.concurrent.atomic._\nimport java.util.UUID\n\ncase class Message(val id:String,val timestamp: Long)\ncase class PostMessage(\n  override val id:String=UUID.randomUUID().toString(), \n  override val timestamp: Long=new java.util.Date().getTime(), \n  text:String) extends Message(id, timestamp)  \ncase class MessageEvent(val channel:String, val message:Message)\n\ncase class StartServer(nodeName: String)\ncase class ServerStarted(nodeName: String, actor: ActorRef) \ncase class IsAlive(nodeName: String)\ncase class IsAliveWeak(nodeName: String)\ncase class AmAlive(nodeName: String, actor: ActorRef)\ncase class GcCheck()\ncase class GcCheckScheduled(isScheduled: Boolean, \n  gcFlag: WeakReference[AnyRef])\n\ntrait WeakLookupClassification { this: WeakEventBus \xe2\x87\x92\nprotected final val subscribers = new Index[Classifier, \n  WeakReference[Subscriber]](mapSize(), \n    new Comparator[WeakReference[Subscriber]] {\n          def compare(a: WeakReference[Subscriber], \n        b: WeakReference[Subscriber]): Int = { \n              if (a.get == None || b.get == None) -1\n              else compareSubscribers(a.get.get, b.get.get)\n        }\n      })  \nprotected def mapSize(): Int\nprotected def compareSubscribers(a: Subscriber, b: Subscriber): Int\nprotected def classify(event: Event): Classifier\nprotected def publish(event: Event, subscriber: Subscriber): Unit\ndef subscribe(subscriber: Subscriber, to: Classifier): Boolean = \n  subscribers.put(to, new WeakReference(subscriber))\ndef unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = \n  subscribers.remove(from, new WeakReference(subscriber))\ndef unsubscribe(subscriber: Subscriber): Unit = \n  subscribers.removeValue(new WeakReference(subscriber))\ndef publish(event: Event): Unit = {\n      val i = subscribers.valueIterator(classify(event))\n      while (i.hasNext) publish(event, i.next().get.get)\n}\n  }\n\nclass WeakEventBus extends EventBus with WeakLookupClassification {\n  type Event = MessageEvent\n  type Classifier=String   \n  type Subscriber = ActorRef\n\n  protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b\n\n  protected def mapSize(): Int = 10\n  protected def classify(event: Event): Classifier = event.channel\n  protected def publish(event: Event, subscriber: Subscriber): Unit =\n      subscriber ! event\n}\n\nlazy val weakEventBus = new WeakEventBus\n\nimplicit val timeout = akka.util.Timeout(1000)\nlazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" \nakka {\n  loglevel = "DEBUG"\n  actor {\n        provider = "akka.remote.RemoteActorRefProvider"\n        debug {\n          receive = on \n          autoreceive = on        \n          lifecycle = on\n          event-stream = on\n        }\n  }\n  remote {\n        transport = "akka.remote.netty.NettyRemoteTransport"\n        log-sent-messages = on \n        log-received-messages = on    \n  }\n}\nserverconf {\n  include "common"\n  akka {\n        actor {\n          deployment {\n        /root {\n          remote = "akka://serversys@127.0.0.1:2552"\n        }    \n          }\n        }\n        remote {\n          netty {\n        hostname = "127.0.0.1"\n        port = 2552\n          }\n        }\n  }\n}\n""").getConfig("serverconf"))\n\nclass Server extends Actor {\n  private[this] val scheduled = new AtomicBoolean(false)\n  private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()\n\n  val gcCheckPeriod = Duration(5000, "millis")\n\n  override def preRestart(reason: Throwable, message: Option[Any]) {\n        self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)\n        super.preRestart(reason, message)\n  }\n\n  def schedule(period: Duration, who: ActorRef) = \n        actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)  \n\n  def receive = {    \n        case StartServer(nodeName) => \n          sender ! ServerStarted(nodeName, self)\n          if (scheduled.compareAndSet(false, true)) \n        schedule(gcCheckPeriod, self)\n          val gcFlagObj = new AnyRef()              \n          gcFlagRef.set(new WeakReference(gcFlagObj))\n          weakEventBus.subscribe(self, nodeName)\n          actorSystem.eventStream.unsubscribe(self)      \n        case GcCheck =>\n          val gcFlag = gcFlagRef.get\n          if (gcFlag == null) {\n        sys.error("gcFlag")\n          }\n         gcFlag.get match {\n        case Some(gcFlagObj) =>\n          scheduled.set(true)\n          schedule(gcCheckPeriod, self)  \n        case None =>\n          println("Actor stopped because of GC: " + self)\n          context.stop(self)        \n        }\n        case GcCheckScheduled(isScheduled, gcFlag) =>\n          if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {\n        gcFlagRef.compareAndSet(null, gcFlag)\n        schedule(gcCheckPeriod, self)            \n          }\n        case IsAlive(nodeName) =>\n          println("Im alive (default EventBus): " + nodeName)\n          sender ! AmAlive(nodeName, self)\n        case e: MessageEvent => \n          println("Im alive (weak EventBus): " + e)       \n    }   \n} \n\n// :paste 2/2\nclass Root extends Actor { \n  def receive = {\n      case start @ StartServer(nodeName) =>\n        val server = context.actorOf(Props[Server], nodeName)\n        context.watch(server)\n        Await.result(server ? start, timeout.duration)\n      .asInstanceOf[ServerStarted] match {\n        case started @ ServerStarted(nodeName, _) => \n          sender ! started\n        case _ => \n          throw new RuntimeException(\n            "[S][FAIL] Could not start server: " + start)\n        }\n      case isAlive @ IsAlive(nodeName) =>\n        Await.result(context.actorFor(nodeName) ? isAlive, \n      timeout.duration).asInstanceOf[AmAlive] match {\n        case AmAlive(nodeName, _) => \n          println("[S][SUCC] Server is alive : " + nodeName)\n        case _ => \n      throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)    \n          }\n      case isAliveWeak @ IsAliveWeak(nodeName) =>                \n        actorSystem.eventStream.publish(MessageEvent(nodeName, \n      PostMessage(text="isAlive-default")))  \n        weakEventBus.publish(MessageEvent(nodeName, \n      PostMessage(text="isAlive-weak")))  \n}\n  }\n\nlazy val rootActor = actorSystem.actorOf(Props[Root], "root")\n\nobject Root {\n  def start(nodeName: String) = {\n        val msg = StartServer(nodeName)\n        var startedActor: Option[ActorRef] = None\n        Await.result(rootActor ? msg, timeout.duration)\n      .asInstanceOf[ServerStarted] match {\n            case succ @ ServerStarted(nodeName, actor) => \n          println("[S][SUCC] Server started: " + succ)\n          startedActor = Some(actor)\n            case _ => \n      throw new RuntimeException("[S][FAIL] Could not start server: " + msg)\n          }\n        startedActor  \n  }\n  def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)\n  def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)\n}\n\n////////////////\n// actual test \nRoot.start("weak")\nThread.sleep(7000L)\nSystem.gc()\nRoot.isAlive("weak")\n
Run Code Online (Sandbox Code Playgroud)\n