it4*_*4rb 5 tcp scala akka-stream
我有一个简单的平衡器,通过TCP将作业分派给几个外部工作进程:
val sinkBalance =
Sink.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[ByteString](workerCount))
for (i <- 0 until workerCount) {
val codec = Framing.simpleFramingProtocol(500).reversed
val connection = Tcp(actorSystem).outgoingConnection("127.0.0.1", 3333+i)
val extConnector = connection.join(codec)
balancer.out(i)
~> extConnector.async
~> Sink.onComplete(_=>println(s"complete: $i"))
}
SinkShape(balancer.in)
})
Run Code Online (Sandbox Code Playgroud)
通常情况下效果很好.但是,当其中一个外部工作进程被杀死时,它的子流将完成(触发println语句),但是平衡器不知道如何:对于第一个下一个请求,它仍然将作业分配给死工人,并导致该请求超时.从第二个下一个请求开始恢复正常,平衡器再也不会使用死亡工人了.
所以我想问两个问题:
如何重新连接损坏的子流?在重新启动死亡工人之后,可能会自动或通过某些事件手动完成.
如何修改上面的代码,以便平衡器不再使用任何死亡工人?