在Akka Streams中创建一个来自actor的流程

Ori*_*ski 23 scala akka akka-stream

可以分别使用actor Source.actorPublisher()Sink.actorSubscriber()方法从actor创建源和接收器.但是有可能创建一个Flow演员吗?

从概念上讲,似乎并没有一个很好的理由,因为它实现了两者ActorPublisherActorSubscriber特征,但不幸的是,该Flow对象没有任何方法可以做到这一点.在这篇优秀的博客文章中,它是在早期版本的Akka Streams中完成的,所以问题是它是否也可以在最新版本(2.4.9)中完成.

Kon*_*ski 40

我是Akka团队的成员,并希望使用此问题来澄清有关原始Reactive Streams接口的一些内容.我希望你会发现这很有用.

最值得注意的是,我们将很快在Akka团队博客上发布关于构建自定义阶段(包括Flows)的多个帖子,因此请密切关注它.

不要使用ActorPublisher/ActorSubscriber

请不要使用ActorPublisherActorSubscriber.它们的级别太低,您最终可能会以违反Reactive Streams规范的方式实现它们.它们是过去的遗留物,即便如此,它们只是"仅限用户模式".现在没有理由使用这些课程.我们从来没有提供过构建流的方法,因为如果它被公开为"原始"Actor API以便实现并正确实现所有规则,那么复杂性就是爆炸性的.

如果您真的想要实现原始的ReactiveStreams接口,那么请使用规范的TCK来验证您的实现是否正确.你可能会被一些更复杂的角落情况a Flow(或者在RS术语Processor中必须处理)猝不及防.

大多数操作都可以在不进行低级别的情况下构建

您应该能够通过构建a Flow[T]并在其上添加所需的操作来简单地构建许多流,就像一个例子:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Run Code Online (Sandbox Code Playgroud)

这是Flow的可重用描述.

由于您在询问高级用户模式,因此这是DSL本身最强大的运营商:statefulFlatMapConcat.绝大多数在普通流元素上运行的操作都可以使用它来表达:Flow.statefulMapConcat[T](f: () ? (Out) ? Iterable[T]): Repr[T].

如果你需要定时器,你可以zipSource.timer

GraphStage是构建自定义阶段最简单,最安全的 API

取而代之的是,建设资源/流/沉没了自己的强大和安全的 API:对GraphStage.请阅读有关建立自定义GraphStages文件(也可以是一个漏/源/流量甚至任何任意形状).它为您处理所有复杂的Reactive Streams规则,同时在实现阶段(可能是Flow)时为您提供完全的自由和类型安全.

例如,从docs中获取的是filter(T => Boolean)运算符的GraphStage实现:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}
Run Code Online (Sandbox Code Playgroud)

它还处理异步通道,默认情况下是可熔的.

除了文档之外,这些博客文章还详细解释了为什么这个API是构建任何形状的自定义阶段的圣杯:


Ram*_*gil 20

Konrad的解决方案演示了如何创建一个使用Actors的自定义舞台,但在大多数情况下,我认为这有点矫枉过正.

通常你有一些能够回答问题的Actor:

val actorRef : ActorRef = ???

type Input = ???
type Output = ???

val queryActor : Input => Future[Output] = 
  (actorRef ? _) andThen (_.mapTo[Output])
Run Code Online (Sandbox Code Playgroud)

这可以很容易地利用基本Flow功能,它可以获得最大数量的并发请求:

val actorQueryFlow : Int => Flow[Input, Output, _] =
  (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
Run Code Online (Sandbox Code Playgroud)

现在actorQueryFlow可以集成到任何流中......

  • 我实际上同意,应该抽空修改我的答案...如果你有时间,请随时编辑它!应该解释这两种方式,推荐方式 (5认同)
  • @ Konrad'ktoso'Malawski感谢您验证我的答案.还要感谢akka的所有工作.你们正在做一些非常酷的事情. (4认同)