Akka流-失败后恢复带有广播和zip的图形

use*_*192 5 scala stream akka akka-stream

我有一个带有广播和zip的流程图。如果在此流程中某件事(无论是什么)失败了,我想删除传递给它的有问题的元素,然后继续。我想出了以下解决方案:

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val dangerousFlow = Flow[Int].map {
    case 5 => throw new RuntimeException("BOOM!")
    case x => x
  }
  val safeFlow = Flow[Int]
  val bcast = builder.add(Broadcast[Int](2))
  val zip = builder.add(Zip[Int, Int])

  bcast ~> dangerousFlow ~> zip.in0
  bcast ~> safeFlow ~> zip.in1

  FlowShape(bcast.in, zip.out)
})

Source(1 to 9)
  .via(flow)
  .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)

我希望它能打印:

(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
Run Code Online (Sandbox Code Playgroud)

但是,它陷入僵局,仅打印:

(1,1)
(2,2)
(3,3)
(4,4)
Run Code Online (Sandbox Code Playgroud)

我们已经进行了一些调试,结果发现它对子级应用了“恢复”策略,导致子级dangerousFlow在失败后恢复,因此需要从中获取元素bcastbcastsafeFlow需要另一个元素之前,它不会发出一个元素,而实际上这是永远不会发生的(因为它正在等待的需求zip)。

有没有一种方法可以恢复图表,而不管其中一个阶段出了什么问题?

Fre*_* A. 4

我认为你很好地理解了这个问题。您看到,当您的元素5崩溃时,您还应该停止正在经历的dangerousFlow元素,因为如果它到达阶段,您就会遇到您所描述的问题。我不知道如何在和阶段之间解决你的问题,但是如何将问题进一步推进,哪里更容易处理?5safeFlowzipbroadcastzip

考虑使用以下内容dangerousFlow

import scala.util._
val dangerousFlow = Flow[Int].map {
  case 5 => Failure(new RuntimeException("BOOM!"))
  case x => Success(x)
}
Run Code Online (Sandbox Code Playgroud)

即使出现问题,dangerousFlow仍然会发出数据。然后,您可以zip像当前正在做的那样,只需要添加一个collect阶段作为图表的最后一步。在流程上,这看起来像:

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
}
Run Code Online (Sandbox Code Playgroud)

现在,如果正如您所写,您确实希望它输出元(5, 5)组,请使用以下命令:

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
  case (_, i)          => i -> i
}
Run Code Online (Sandbox Code Playgroud)