监控已关闭的图形Akka Stream

0__*_*0__ 8 scala akka-stream

如果我RunningGraph在Akka Stream 创建了一个,我怎么知道(从外面)

  1. 当所有节点因完成而被取消时?
  2. 当所有节点因错误而停止时?

Vla*_*eev 19

我不认为有任何方法可以为任意图形执行此操作,但如果您控制了图形,则只需将监视接收器连接到每个节点的输出,这些输出可能会失败或完成(这些节点是至少一个输出),例如:

import akka.actor.Status

// obtain graph parts (this can be done inside the graph building as well)
val source: Source[Int, NotUsed] = ...
val flow: Flow[Int, String, NotUsed] = ...
val sink: Sink[String, NotUsed] = ...

// create monitoring actors
val aggregate = actorSystem.actorOf(Props[Aggregate])
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))

// create the graph
val graph = GraphDSL.create() { implicit b =>
   import GraphDSL._

   val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
   val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))

   val bc1 = b.add(Broadcast[Int](2))
   val bc2 = b.add(Broadcast[String](2))

   // main flow
   source ~> bc1 ~> flow ~> bc2 ~> sink

   // monitoring branches
   bc1 ~> sourceMonitor
   bc2 ~> flowMonitor

   ClosedShape
}

// run the graph
RunnableGraph.fromGraph(graph).run()

class Monitor(name: String, aggregate: ActorRef) extends Actor {
  override def receive: Receive = {
    case Status.Success(_) => aggregate ! s"$name completed successfully"
    case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
    case _ =>
  }
}

class Aggregate extends Actor {
  override def receive: Receive = {
    case s: String => println(s)
  }
}
Run Code Online (Sandbox Code Playgroud)

也可以只创建一个监视actor并在所有监视接收器中使用它,但在这种情况下,您将无法在失败的流之间轻松区分.

并且还有watchTermination()源和流的方法允许实现与此时的流一起终止的未来.我认为它可能很难使用GraphDSL,但使用常规流方法它可能看起来像这样:

import akka.Done
import akka.actor.Status
import akka.pattern.pipe

val monitor = actorSystem.actorOf(Props[Monitor])
source
  .watchTermination()((f, _) => f pipeTo monitor) 
  .via(flow).watchTermination((f, _) => f pipeTo monitor)
  .to(sink)
  .run()

class Monitor extends Actor {
  override def receive: Receive = {
    case Done => println("stream completed")
    case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
  }
}
Run Code Online (Sandbox Code Playgroud)

您可以在将其值传递给actor之前转换未来,以区分流.