我用下一个代码:
csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)
pathToResults目录有很多文件,如part-0000,part-0001等.我可以使用FileUtil.copyMerge(),但它真的很慢,它下载驱动程序上的所有文件,然后将它们上传到hadoop.但FileUtil.copyMerge()比以下更快:
csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)
如何在没有重新分区和FileUtil.copyMerge()的情况下合并spark结果文件?
我有一个 java.io.File 序列流。我flatMapConcat用于创建新Source文件,类似这样:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)
流结束后是否有一种简单的方法可以关闭每个文件?SomePublisher()无法关闭它。