akka stream:断开连接后如何重新连接TCP

it4*_*4rb 5 tcp scala akka-stream

我有一个简单的平衡器,通过TCP将作业分派给几个外部工作进程:

val sinkBalance =
Sink.fromGraph(GraphDSL.create() { implicit b =>
  val balancer = b.add(Balance[ByteString](workerCount))

  for (i <- 0 until workerCount) {

    val codec = Framing.simpleFramingProtocol(500).reversed
    val connection = Tcp(actorSystem).outgoingConnection("127.0.0.1", 3333+i)
    val extConnector = connection.join(codec)

    balancer.out(i) 
      ~> extConnector.async 
        ~> Sink.onComplete(_=>println(s"complete: $i"))
  }

  SinkShape(balancer.in)
})
Run Code Online (Sandbox Code Playgroud)

通常情况下效果很好.但是,当其中一个外部工作进程被杀死时,它的子流将完成(触发println语句),但是平衡器不知道如何:对于第一个下一个请求,它仍然将作业分配给死工人,并导致该请求超时.从第二个下一个请求开始恢复正常,平衡器再也不会使用死亡工人了.

所以我想问两个问题:

  1. 如何重新连接损坏的子流?在重新启动死亡工人之后,可能会自动或通过某些事件手动完成.

  2. 如何修改上面的代码,以便平衡器不再使用任何死亡工人?