关闭Akka流以进行资源清理

ase*_*ell 3 akka apache-kafka akka-stream

使用Akka流时,是否有任何方法可以关闭/关闭不再需要资源清理的流?

编辑:当源由无限流组成时,它可能永远不会完成,我想在完成的源之前停止它。

用法示例:

Source.from(publisher)
      .map((p) -> p)
      .to(Sink.ignore())
      .run(materializer)
Run Code Online (Sandbox Code Playgroud)

有没有办法关闭流?

Ram*_*gil 5

您可以StreamActorMaterializer一段时间后独立运行on 并在ActorMaterializer上调用shutdown:

val actorSystem = ActorSystem()

val temporaryStream = {

  val localMat = ActorMaterializer()(actorSystem)

  import actorSystem.dispatcher
  actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() }

  Source.from(publisher)
        .map((p) -> p)
        .to(Sink.ignore())
        .run()(localMat)
}
Run Code Online (Sandbox Code Playgroud)

同样,您可以返回ActorMaterializer而不是物化流,并根据时间以外的其他外部条件关闭ActorMaterializer。