如何对 ActorPublisher 进行反压

Saj*_*lva 1 scala reactive-programming akka reactive-streams akka-stream

我正在编写一些示例来理解 akka 流和背压。我正在尝试了解缓慢的消费者背压对 AkkaPublisher 有何影响

我的代码如下。

class DataPublisher extends ActorPublisher[Int] {

  import akka.stream.actor.ActorPublisherMessage._

  var items: List[Int] = List.empty

  def receive = {
    case s: String =>
      println(s"Producer buffer size ${items.size}")
      if (totalDemand == 0)
        items = items :+ s.toInt
      else
        onNext(s.toInt)

    case Request(demand) =>
      if (demand > items.size) {
        items foreach (onNext)
        items = List.empty
      }
      else {
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }
}
Run Code Online (Sandbox Code Playgroud)

Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)
Run Code Online (Sandbox Code Playgroud)

其中接收器是订阅者,具有睡眠来模拟慢速消费者。无论如何,发布者都会继续生成数据。

--编辑-- 我的问题是当需求为 0 时以编程方式缓冲数据。如何利用背压来减慢发布者的速度

就像是

throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())
Run Code Online (Sandbox Code Playgroud)

这不会影响发布者并且其缓冲区会继续运行。

谢谢,萨吉思

Kon*_*ski 5

不要使用 ActorPublisher

\n\n

首先,不要使用ActorPublisher- 它是一个非常低级且已弃用的 API。我们决定弃用,因为用户不应该在 Akka Streams 中处理如此低的抽象级别。

\n\n

其中一件棘手的事情正是您所询问的——如果使用此 API,则处理背压完全由编写 ActorPublisher 的开发人员负责。因此,您必须接收Request(n)消息并确保发出的元素数量永远不会超过您请求的数量。此行为在反应流规范中指定,然后您必须正确实现该规范。基本上,您会接触到 Reactive Streams 的所有复杂性(这是一个完整的规范,有许多边缘情况 - 免责声明:我曾经/正在开发 Reactive Streams 以及 Akka Streams)。

\n\n

显示 GraphStage 中背压的表现方式

\n\n

其次,要构建自定义阶段,您应该使用为其设计的 API GraphStage:. 请注意,这个阶段的级别也相当低。通常,Akka Streams 的用户不需要编写自定义阶段,但是如果他们要实现内置阶段不提供的一些逻辑,那么编写自己的阶段绝对是可以预期的并且很好。

\n\n

这是来自 Akka 代码库的简化 Filter 实现:

\n\n
\ncase class Filter[T](p: T \xe2\x87\x92 Boolean) extends SimpleLinearGraphStage[T] {\n  override def initialAttributes: Attributes = DefaultAttributes.filter\n\n  override def toString: String = "Filter"\n\n  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =\n    new GraphStageLogic(shape) with OutHandler with InHandler {\n\n    override def onPush(): Unit = {\n      val elem = grab(in)\n      if (p(elem)) push(out, elem)\n      else pull(in)\n    }\n\n    // this method will NOT be called, if the downstream has not signalled enough demand!\n    // this method being NOT called is how back-pressure manifests in stages\n    override def onPull(): Unit = pull(in)\n\n    setHandlers(in, out, this)\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

正如您所看到的,您不需要自己实现整个响应式流逻辑和规则(这很困难),而是获得简单的回调,例如onPushonPull。Akka Streams 处理需求管理,onPull如果下游发出需求信号,它会自动调用,如果没有需求,它不会调用它——这意味着下游正在向这个阶段施加背压。

\n