Ori*_*ski 23 scala akka akka-stream
可以分别使用actor Source.actorPublisher()和Sink.actorSubscriber()方法从actor创建源和接收器.但是有可能创建一个Flow演员吗?
从概念上讲,似乎并没有一个很好的理由,因为它实现了两者ActorPublisher和ActorSubscriber特征,但不幸的是,该Flow对象没有任何方法可以做到这一点.在这篇优秀的博客文章中,它是在早期版本的Akka Streams中完成的,所以问题是它是否也可以在最新版本(2.4.9)中完成.
Kon*_*ski 40
我是Akka团队的成员,并希望使用此问题来澄清有关原始Reactive Streams接口的一些内容.我希望你会发现这很有用.
最值得注意的是,我们将很快在Akka团队博客上发布关于构建自定义阶段(包括Flows)的多个帖子,因此请密切关注它.
不要使用ActorPublisher/ActorSubscriber
请不要使用ActorPublisher和ActorSubscriber.它们的级别太低,您最终可能会以违反Reactive Streams规范的方式实现它们.它们是过去的遗留物,即便如此,它们只是"仅限用户模式".现在没有理由使用这些课程.我们从来没有提供过构建流的方法,因为如果它被公开为"原始"Actor API以便实现并正确实现所有规则,那么复杂性就是爆炸性的.
如果您真的想要实现原始的ReactiveStreams接口,那么请使用规范的TCK来验证您的实现是否正确.你可能会被一些更复杂的角落情况a Flow(或者在RS术语Processor中必须处理)猝不及防.
大多数操作都可以在不进行低级别的情况下构建
您应该能够通过构建a Flow[T]并在其上添加所需的操作来简单地构建许多流,就像一个例子:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Run Code Online (Sandbox Code Playgroud)
这是Flow的可重用描述.
由于您在询问高级用户模式,因此这是DSL本身最强大的运营商:statefulFlatMapConcat.绝大多数在普通流元素上运行的操作都可以使用它来表达:Flow.statefulMapConcat[T](f: () ? (Out) ? Iterable[T]): Repr[T].
如果你需要定时器,你可以zip用Source.timer等
GraphStage是构建自定义阶段的最简单,最安全的 API
取而代之的是,建设资源/流/沉没了自己的强大和安全的 API:对GraphStage.请阅读有关建立自定义GraphStages文件(也可以是一个漏/源/流量甚至任何任意形状).它为您处理所有复杂的Reactive Streams规则,同时在实现阶段(可能是Flow)时为您提供完全的自由和类型安全.
例如,从docs中获取的是filter(T => Boolean)运算符的GraphStage实现:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Run Code Online (Sandbox Code Playgroud)
它还处理异步通道,默认情况下是可熔的.
除了文档之外,这些博客文章还详细解释了为什么这个API是构建任何形状的自定义阶段的圣杯:
Ram*_*gil 20
Konrad的解决方案演示了如何创建一个使用Actors的自定义舞台,但在大多数情况下,我认为这有点矫枉过正.
通常你有一些能够回答问题的Actor:
val actorRef : ActorRef = ???
type Input = ???
type Output = ???
val queryActor : Input => Future[Output] =
(actorRef ? _) andThen (_.mapTo[Output])
Run Code Online (Sandbox Code Playgroud)
这可以很容易地利用基本Flow功能,它可以获得最大数量的并发请求:
val actorQueryFlow : Int => Flow[Input, Output, _] =
(parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
Run Code Online (Sandbox Code Playgroud)
现在actorQueryFlow可以集成到任何流中......