Saj*_*lva 1 scala reactive-programming akka reactive-streams akka-stream
我正在编写一些示例来理解 akka 流和背压。我正在尝试了解缓慢的消费者背压对 AkkaPublisher 有何影响
我的代码如下。
class DataPublisher extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[Int] = List.empty
def receive = {
case s: String =>
println(s"Producer buffer size ${items.size}")
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size) {
items foreach (onNext)
items = List.empty
}
else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
Run Code Online (Sandbox Code Playgroud)
和
Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)
Run Code Online (Sandbox Code Playgroud)
其中接收器是订阅者,具有睡眠来模拟慢速消费者。无论如何,发布者都会继续生成数据。
--编辑-- 我的问题是当需求为 0 时以编程方式缓冲数据。如何利用背压来减慢发布者的速度
就像是
throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())
Run Code Online (Sandbox Code Playgroud)
这不会影响发布者并且其缓冲区会继续运行。
谢谢,萨吉思
首先,不要使用ActorPublisher
- 它是一个非常低级且已弃用的 API。我们决定弃用,因为用户不应该在 Akka Streams 中处理如此低的抽象级别。
其中一件棘手的事情正是您所询问的——如果使用此 API,则处理背压完全由编写 ActorPublisher 的开发人员负责。因此,您必须接收Request(n)
消息并确保发出的元素数量永远不会超过您请求的数量。此行为在反应流规范中指定,然后您必须正确实现该规范。基本上,您会接触到 Reactive Streams 的所有复杂性(这是一个完整的规范,有许多边缘情况 - 免责声明:我曾经/正在开发 Reactive Streams 以及 Akka Streams)。
其次,要构建自定义阶段,您应该使用为其设计的 API GraphStage
:. 请注意,这个阶段的级别也相当低。通常,Akka Streams 的用户不需要编写自定义阶段,但是如果他们要实现内置阶段不提供的一些逻辑,那么编写自己的阶段绝对是可以预期的并且很好。
这是来自 Akka 代码库的简化 Filter 实现:
\n\n\ncase class Filter[T](p: T \xe2\x87\x92 Boolean) extends SimpleLinearGraphStage[T] {\n override def initialAttributes: Attributes = DefaultAttributes.filter\n\n override def toString: String = "Filter"\n\n override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =\n new GraphStageLogic(shape) with OutHandler with InHandler {\n\n override def onPush(): Unit = {\n val elem = grab(in)\n if (p(elem)) push(out, elem)\n else pull(in)\n }\n\n // this method will NOT be called, if the downstream has not signalled enough demand!\n // this method being NOT called is how back-pressure manifests in stages\n override def onPull(): Unit = pull(in)\n\n setHandlers(in, out, this)\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n正如您所看到的,您不需要自己实现整个响应式流逻辑和规则(这很困难),而是获得简单的回调,例如onPush
和onPull
。Akka Streams 处理需求管理,onPull
如果下游发出需求信号,它会自动调用,如果没有需求,它不会调用它——这意味着下游正在向这个阶段施加背压。