Luc*_*ano 7 scala akka rx-java
我一直在为RX Java使用scala绑定一段时间了,我正在考虑将它与Akka Actors结合起来.我想知道Observable
在Akka Actor
s 之间传递RX是否安全/可能.例如,一个程序可以打印最多20个(每秒)整数的正方形:
/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)
def receive = {
case o : Observable[Int] =>
o.subscribe( onNext => println )
}
worker ! toTwenty
}
/* worker which returns squares of even numbers */
object Worker extends Actor {
def receive = {
case o : Observable[Int] =>
sender ! o filter { _ % 2 == 0 } map { _^2 }
}
}
Run Code Online (Sandbox Code Playgroud)
(请将此视为伪代码;它不会编译).注意我是send
从一个actor到另一个actor的Observable.我想了解:
Observable
不能在分布式系统发送-这是在本地内存中的对象的引用.但是,它会在本地工作吗?subscribe
电话中安排Producer
.我可以拆分工作,以便分别在每个演员身上完成吗?题外话:我看过一些看起来要结合RX和Actors的项目:
http://jmhofer.johoop.de/?p=507和 https://github.com/jmhofer/rxjava-akka
但这些不同之处在于它们并不是简单地Observable
在演员之间传递消息.他们首先调用subscribe()
获取值,然后将这些值发送到actor邮箱,并创建一个新Observable
的.还是我弄错了?
mav*_*ein 10
你的方法不是一个好主意.Akka背后的主要思想是将消息发送到actor的邮箱,并且actor按顺序处理它们(在一个Thread上).这样,2个线程就不可能访问actor的状态,也不会出现并发问题.
在您的情况下,您在Observable上使用subscribe.您的onNext回调可能会在另一个线程上执行.因此,2个线程突然可以访问你的演员的状态.所以你必须非常小心你在回调中做了什么.这是您上次观察其他实现的原因.这些实现似乎抓住onNext中的值并将此值作为消息发送.您不得在此类回调中更改actor的内部状态.而是向同一个演员发送消息.这样,再次保证了一个线程上的顺序处理.
我花了一些时间试验,发现你可以Observable
在 Akka 中使用s。事实上,由于Observable
可以被认为是 的多元扩展,因此Future
您可以遵循与组合 Actors 和 Futures 相同的准则。Future
事实上,官方文档和教科书(例如Akka Concurrency,Wyatt 2013)都支持/鼓励在 Akka 中使用,但有很多警告。
先说正面:
Observable
s,就像Future
s 是不可变的,所以理论上它们应该可以安全地在消息中传递。Observable
允许您指定执行上下文,非常类似于Future
. 这是使用Observable.observeOn(scheduler)
. 您可以通过将 Akka 调度程序(例如system.dispatcher
或context.dispatcher
)传递给rx.lang.scala.ExecutorScheduler
构造函数,从 Akka 的 exec 上下文创建调度程序。这应该确保它们是同步的。ask
模式。类似的模式可用于 Observables(见本文底部)。这也解决了向远程 observable 发送消息的问题。现在警告:
Observable.subscribe()
不应使用 Actor 的封闭范围来访问它的内部状态。例如,您不应调用sender
订阅。相反,将它存储到一个 val,然后访问这个 val,如下例所示。最后,我ask
为 Observables实现了等价的模式。它使用toObservable
or??
异步返回一个 Observable,由临时演员和PublishSubject
幕后支持。请注意,源发送的消息是rx.lang.scala.Notification
using类型materialize()
,因此它们满足可观察合约中的完整和错误状态。否则我们无法将这些状态发送给接收器。但是,没有什么可以阻止您发送任意类型的消息;这些将简单地调用onNext()
. observable 有一个超时,如果在某个时间间隔内没有收到消息,则该超时会因超时异常而停止。
它是这样使用的:
import akka.pattern.RX
implicit val timeout = akka.util.Timeout(10 seconds)
case object Req
val system = ActorSystem("test")
val source = system.actorOf(Props[Source],"thesource")
class Source() extends Actor {
def receive : Receive = {
case Req =>
val s = sender()
Observable.interval(1 second).take(5).materialize.subscribe{s ! _}
}
}
val obs = source ?? Req
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString),
(error : Throwable) => { error.printStackTrace ; system.shutdown() },
() => { println("completed, shutting system down"); system.shutdown() })
Run Code Online (Sandbox Code Playgroud)
并产生这个输出:
onnext : 0
onnext : 1
onnext : 2
onnext : 3
onnext : 4
completed, shutting system down
Run Code Online (Sandbox Code Playgroud)
来源如下。它是 AskSupport.scala 的修改版本。
package akka.pattern
/*
* File : RxSupport.scala
* This package is a modified version of 'AskSupport' to provide methods to
* support RX Observables.
*/
import rx.lang.scala.{Observable,Subject,Notification}
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor._
import scala.concurrent.ExecutionContext
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.sysmsg._
class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
def this(message: String) = this(message, null: Throwable)
override def getCause(): Throwable = cause
}
trait RxSupport {
implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef)
def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message
implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection)
def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message
}
final class RxActorRef(val actorRef : ActorRef) extends AnyVal {
def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match {
case ref : InternalActorRef if ref.isTerminated =>
actorRef ! message
Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated."))
case ref : InternalActorRef =>
if (timeout.duration.length <= 0)
Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]"))
else {
val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString)
actorRef.tell(message, a)
a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
}
}
def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}
final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal {
def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match {
case ref : InternalActorRef =>
if (timeout.duration.length <= 0)
Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]"))
else {
val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString)
actorSel.tell(message, a)
a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
}
case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
}
def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}
private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef {
import RxSubjectActorRef._
import AbstractRxActorRef.stateOffset
import AbstractRxActorRef.watchedByOffset
/**
* As an optimization for the common (local) case we only register this RxSubjectActorRef
* with the provider when the `path` member is actually queried, which happens during
* serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
*
* Defined states:
* null => started, path not yet created
* Registering => currently creating temp path and registering it
* path: ActorPath => path is available and was registered
* StoppedWithPath(path) => stopped, path available
* Stopped => stopped, path not yet created
*/
@volatile
private[this] var _stateDoNotCallMeDirectly: AnyRef = _
@volatile
private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet
@inline
private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]
@inline
private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)
@tailrec // Returns false if the subject is already completed
private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
case null => false
case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher)
}
@tailrec
private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
case null => ()
case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
}
@tailrec
private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
case null => ActorCell.emptyActorRefSet
case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
}
@inline
private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
override def getParent: InternalActorRef = provider.tempContainer
def internalCallingThreadExecutionContext: ExecutionContext =
provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext
/**
* Contract of this method:
* Must always return the same ActorPath, which must have
* been registered if we haven't been stopped yet.
*/
@tailrec
def path: ActorPath = state match {
case null =>
if (updateState(null, Registering)) {
var p: ActorPath = null
try {
p = provider.tempPath()
provider.registerTempActor(this, p)
p
} finally { setState(p) }
} else path
case p: ActorPath => p
case StoppedWithPath(p) => p
case Stopped =>
// even if we are already stopped we still need to produce a proper path
updateState(Stopped, StoppedWithPath(provider.tempPath()))
path
case Registering => path // spin until registration is completed
}
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
case Stopped | _: StoppedWithPath => provider.deadLetters ! message
case _ =>
if (message == null) throw new InvalidMessageException("Message is null")
else
message match {
case n : Notification[Any] => n.accept(result)
case other => result.onNext(other)
}
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate => stop()
case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
case Watch(watchee, watcher) =>
if (watchee == this && watcher != this) {
if (!addWatcher(watcher))
// NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher) =>
if (watchee == this && watcher != this) remWatcher(watcher)
else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
case _ =>
}
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match {
case Stopped | _: StoppedWithPath => true
case _ => false
}
@tailrec
override def stop(): Unit = {
def ensureCompleted(): Unit = {
result.onError(new ActorKilledException("Stopped"))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
watchers foreach { watcher =>
// NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
watcher.asInstanceOf[InternalActorRef]
.sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
}
}
}
state match {
case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensureCompleted() else stop()
case p: ActorPath =>
if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
case Stopped | _: StoppedWithPath => // already stopped
case Registering => stop() // spin until registration is completed before stopping
}
}
}
private[akka] object RxSubjectActorRef {
private case object Registering
private case object Stopped
private final case class StoppedWithPath(path : ActorPath)
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = {
val result = Subject[Any]()
new RxSubjectActorRef(provider, result)
/*timeout logic moved to RxActorRef/Sel*/
}
}
/*
* This doesn't work, need to create as a Java class for some reason ...
final object AbstractRxActorRef {
final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly"))
final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly"))
}*/
package object RX extends RxSupport
Run Code Online (Sandbox Code Playgroud)
更新 2015-09-10
以为我会在这里添加一些更简单的代码来实现??
运算符。这与上面的略有不同,因为 a) 它不支持网络数据并且 b) 它返回Observable[Observable[A]]
,这使得同步响应更容易。优点是它不会弄乱 Akka 的内脏:
object TypedAskSupport {
import scala.concurrent.Future
import akka.actor.{ActorRef,ActorSelection}
import scala.reflect.ClassTag
implicit class TypedAskableActorRef(actor : ActorRef) {
val converted : akka.pattern.AskableActorRef = actor
def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
converted.ask(topic).mapTo[Observable[R]]
def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
Observable.from (this.?[R](topic)(timeout))
def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
converted.ask(topic).asInstanceOf[Future[R]]
def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] =
Observable.from { this.?[R](topic)(timeout) }
}
implicit class TypedAskableActorSelection(actor : ActorSelection) {
val converted : akka.pattern.AskableActorSelection = actor
def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
converted.ask(topic).mapTo[Observable[R]]
def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
Observable.from (this.?[R](topic)(timeout))
def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
converted.ask(topic).asInstanceOf[Future[R]]
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2490 次 |
最近记录: |