用于akka流Source.actorRef缓冲区和OverflowStrategy的接收器折叠

Xia*_*ong 1 scala akka akka-stream

这是akka文档的代码片段

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
  .toMat(sinkUnderTest)(Keep.both).run()

ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")

val result = Await.result(future, 3.seconds)
assert(result == "123")
Run Code Online (Sandbox Code Playgroud)

这是一个有效的代码片段,但是,如果我使用ref来告诉另一条消息ref ! 4,我会得到一个例外akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

我想缓冲区大小3应该足够了.原因是折叠操作是(acc,ele)=> acc,所以它需要累加器和元素来返回新的值累加器.

所以我改变了代码让另一个演员告诉等待3秒.它再次运作.

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  ref ! 1
  ref ! 2
  ref ! 3
  Thread.sleep(3000)
  ref ! 4
  ref ! akka.actor.Status.Success("done")

  val result = Await.result(future, 10.seconds)

  println(result)
Run Code Online (Sandbox Code Playgroud)

但是,我的问题是,有没有办法告诉Akka流减速或等待接收器可用.我也在使用OverflowStrategy.backpressure,但它说Backpressure overflowStrategy not supported.

还有其他选择吗?

Ste*_*tti 6

您应该考虑将Source.queue一种方法从背景中以背压方式注入流中.

Source.queue将实现为您可以提供元素的队列对象,但是当您提供元素时,您将Future在流准备接受消息时返回完成.

示例如下:

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
    Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  Future.sequence(Seq(
    queue.offer(1),
    queue.offer(2),
    queue.offer(3),
    queue.offer(4)
  ))

  queue.complete()

  val result = Await.result(future, 10.seconds)

  println(result)
Run Code Online (Sandbox Code Playgroud)

更多信息在文档中.