我有一个 java.io.File 序列流。我flatMapConcat用于创建新Source文件,类似这样:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)
流结束后是否有一种简单的方法可以关闭每个文件?SomePublisher()无法关闭它。
因此,我找到了解决我的问题的多种方法中的一种好方法,但如果您有其他方法,我也希望看到它。
def someSource(file: File) = {
val f = openFile(file)
Source
.fromPublisher(SomePublisher(f))
.transform(() => new PushStage[?, ?] {
override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
override def postStop(): Unit = {
f.close()
super.postStop()
}
}
}
def func(files: List[File]) =
Source(files)
.flatMapConcat(someSource)
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)