说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的......
我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点的正确时机.
Akka是否提供此类用例?
val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])
// an event is published & received
as.eventStream.publish(new KnownProblem)
//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)
// an event is published & STILL received
as.eventStream.publish(new KnownProblem)
Run Code Online (Sandbox Code Playgroud)
好吧,我实际上无法实现它,但 actor 正在停止 GC。使用 Scala 2.9.2 (REPL) + Akka 2.0.3。
\n\nwith没有帮助 - 因为在 Akka 中你也有一个EventBuswith ( ) ,也可以订阅生命周期事件。我没有尝试过的事情 - 使用只知道我们新闪亮的调度程序创建演员- 所以也许我错过了重点?WeakReference[ActorRef]dungeonChildrenContainerself.childrenMonitorWeakEventBus
这是 REPL 的代码(通过适当的导入启动它,然后:paste分两步完成):
// Start REPL with something like:\n// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:\n// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:\n// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:\n// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:\n// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"\n\n// :paste 1/2\nimport akka.actor._\nimport akka.pattern._\nimport akka.event._\nimport akka.util._\nimport com.typesafe.config.ConfigFactory\nimport akka.util.Timeout\nimport akka.dispatch.Await\nimport scala.ref.WeakReference\nimport java.util.Comparator\nimport java.util.concurrent.atomic._\nimport java.util.UUID\n\ncase class Message(val id:String,val timestamp: Long)\ncase class PostMessage(\n override val id:String=UUID.randomUUID().toString(), \n override val timestamp: Long=new java.util.Date().getTime(), \n text:String) extends Message(id, timestamp) \ncase class MessageEvent(val channel:String, val message:Message)\n\ncase class StartServer(nodeName: String)\ncase class ServerStarted(nodeName: String, actor: ActorRef) \ncase class IsAlive(nodeName: String)\ncase class IsAliveWeak(nodeName: String)\ncase class AmAlive(nodeName: String, actor: ActorRef)\ncase class GcCheck()\ncase class GcCheckScheduled(isScheduled: Boolean, \n gcFlag: WeakReference[AnyRef])\n\ntrait WeakLookupClassification { this: WeakEventBus \xe2\x87\x92\nprotected final val subscribers = new Index[Classifier, \n WeakReference[Subscriber]](mapSize(), \n new Comparator[WeakReference[Subscriber]] {\n def compare(a: WeakReference[Subscriber], \n b: WeakReference[Subscriber]): Int = { \n if (a.get == None || b.get == None) -1\n else compareSubscribers(a.get.get, b.get.get)\n }\n }) \nprotected def mapSize(): Int\nprotected def compareSubscribers(a: Subscriber, b: Subscriber): Int\nprotected def classify(event: Event): Classifier\nprotected def publish(event: Event, subscriber: Subscriber): Unit\ndef subscribe(subscriber: Subscriber, to: Classifier): Boolean = \n subscribers.put(to, new WeakReference(subscriber))\ndef unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = \n subscribers.remove(from, new WeakReference(subscriber))\ndef unsubscribe(subscriber: Subscriber): Unit = \n subscribers.removeValue(new WeakReference(subscriber))\ndef publish(event: Event): Unit = {\n val i = subscribers.valueIterator(classify(event))\n while (i.hasNext) publish(event, i.next().get.get)\n}\n }\n\nclass WeakEventBus extends EventBus with WeakLookupClassification {\n type Event = MessageEvent\n type Classifier=String \n type Subscriber = ActorRef\n\n protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b\n\n protected def mapSize(): Int = 10\n protected def classify(event: Event): Classifier = event.channel\n protected def publish(event: Event, subscriber: Subscriber): Unit =\n subscriber ! event\n}\n\nlazy val weakEventBus = new WeakEventBus\n\nimplicit val timeout = akka.util.Timeout(1000)\nlazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" \nakka {\n loglevel = "DEBUG"\n actor {\n provider = "akka.remote.RemoteActorRefProvider"\n debug {\n receive = on \n autoreceive = on \n lifecycle = on\n event-stream = on\n }\n }\n remote {\n transport = "akka.remote.netty.NettyRemoteTransport"\n log-sent-messages = on \n log-received-messages = on \n }\n}\nserverconf {\n include "common"\n akka {\n actor {\n deployment {\n /root {\n remote = "akka://serversys@127.0.0.1:2552"\n } \n }\n }\n remote {\n netty {\n hostname = "127.0.0.1"\n port = 2552\n }\n }\n }\n}\n""").getConfig("serverconf"))\n\nclass Server extends Actor {\n private[this] val scheduled = new AtomicBoolean(false)\n private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()\n\n val gcCheckPeriod = Duration(5000, "millis")\n\n override def preRestart(reason: Throwable, message: Option[Any]) {\n self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)\n super.preRestart(reason, message)\n }\n\n def schedule(period: Duration, who: ActorRef) = \n actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) \n\n def receive = { \n case StartServer(nodeName) => \n sender ! ServerStarted(nodeName, self)\n if (scheduled.compareAndSet(false, true)) \n schedule(gcCheckPeriod, self)\n val gcFlagObj = new AnyRef() \n gcFlagRef.set(new WeakReference(gcFlagObj))\n weakEventBus.subscribe(self, nodeName)\n actorSystem.eventStream.unsubscribe(self) \n case GcCheck =>\n val gcFlag = gcFlagRef.get\n if (gcFlag == null) {\n sys.error("gcFlag")\n }\n gcFlag.get match {\n case Some(gcFlagObj) =>\n scheduled.set(true)\n schedule(gcCheckPeriod, self) \n case None =>\n println("Actor stopped because of GC: " + self)\n context.stop(self) \n }\n case GcCheckScheduled(isScheduled, gcFlag) =>\n if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {\n gcFlagRef.compareAndSet(null, gcFlag)\n schedule(gcCheckPeriod, self) \n }\n case IsAlive(nodeName) =>\n println("Im alive (default EventBus): " + nodeName)\n sender ! AmAlive(nodeName, self)\n case e: MessageEvent => \n println("Im alive (weak EventBus): " + e) \n } \n} \n\n// :paste 2/2\nclass Root extends Actor { \n def receive = {\n case start @ StartServer(nodeName) =>\n val server = context.actorOf(Props[Server], nodeName)\n context.watch(server)\n Await.result(server ? start, timeout.duration)\n .asInstanceOf[ServerStarted] match {\n case started @ ServerStarted(nodeName, _) => \n sender ! started\n case _ => \n throw new RuntimeException(\n "[S][FAIL] Could not start server: " + start)\n }\n case isAlive @ IsAlive(nodeName) =>\n Await.result(context.actorFor(nodeName) ? isAlive, \n timeout.duration).asInstanceOf[AmAlive] match {\n case AmAlive(nodeName, _) => \n println("[S][SUCC] Server is alive : " + nodeName)\n case _ => \n throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName) \n }\n case isAliveWeak @ IsAliveWeak(nodeName) => \n actorSystem.eventStream.publish(MessageEvent(nodeName, \n PostMessage(text="isAlive-default"))) \n weakEventBus.publish(MessageEvent(nodeName, \n PostMessage(text="isAlive-weak"))) \n}\n }\n\nlazy val rootActor = actorSystem.actorOf(Props[Root], "root")\n\nobject Root {\n def start(nodeName: String) = {\n val msg = StartServer(nodeName)\n var startedActor: Option[ActorRef] = None\n Await.result(rootActor ? msg, timeout.duration)\n .asInstanceOf[ServerStarted] match {\n case succ @ ServerStarted(nodeName, actor) => \n println("[S][SUCC] Server started: " + succ)\n startedActor = Some(actor)\n case _ => \n throw new RuntimeException("[S][FAIL] Could not start server: " + msg)\n }\n startedActor \n }\n def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)\n def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)\n}\n\n////////////////\n// actual test \nRoot.start("weak")\nThread.sleep(7000L)\nSystem.gc()\nRoot.isAlive("weak")\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
680 次 |
| 最近记录: |