小编Leo*_*ard的帖子

如何在没有重新分区和copyMerge的情况下合并spark结果文件?

我用下一个代码:

csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)

pathToResults目录有很多文件,如part-0000,part-0001等.我可以使用FileUtil.copyMerge(),但它真的很慢,它下载驱动程序上的所有文件,然后将它们上传到hadoop.但FileUtil.copyMerge()比以下更快:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)

如何在没有重新分区和FileUtil.copyMerge()的情况下合并spark结果文件?

hadoop scala apache-spark

7
推荐指数
1
解决办法
2万
查看次数

流结束后如何关闭文件?

我有一个 java.io.File 序列流。我flatMapConcat用于创建新Source文件,类似这样:

def func(files: List[File]) =
  Source(files)
    .map(f => openFile(f))
    .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)

流结束后是否有一种简单的方法可以关闭每个文件?SomePublisher()无法关闭它。

scala akka-stream

5
推荐指数
1
解决办法
693
查看次数

标签 统计

scala ×2

akka-stream ×1

apache-spark ×1

hadoop ×1