流结束后如何关闭文件?

Leo*_*ard 5 scala akka-stream

我有一个 java.io.File 序列流。我flatMapConcat用于创建新Source文件,类似这样:

def func(files: List[File]) =
  Source(files)
    .map(f => openFile(f))
    .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)

流结束后是否有一种简单的方法可以关闭每个文件?SomePublisher()无法关闭它。

Leo*_*ard 2

因此,我找到了解决我的问题的多种方法中的一种好方法,但如果您有其他方法,我也希望看到它。

def someSource(file: File) = {
  val f = openFile(file)

  Source
    .fromPublisher(SomePublisher(f))
    .transform(() => new PushStage[?, ?] {
      override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)

      override def postStop(): Unit = {
        f.close()
        super.postStop()
      }
    }
}

def func(files: List[File]) =
  Source(files)
    .flatMapConcat(someSource)
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)