zax*_*xme 3 amazon-s3 apache-spark
我目前正在探索Spark。我面临以下任务-获取RDD,根据特定条件对其进行分区,然后将多个文件写入S3存储桶中的不同文件夹中。
一切正常,直到我们开始上传到S3部分。我在SO上阅读了所有与该问题有关的问题,发现我可以使用RDD AmazonS3Client或使用该saveToTextFile方法。我面临两个问题:
如果我选择“ AmazonS3Client我”,则java.io.NotSerializableException因为代码是从Spark驱动程序发送到工作程序的,因此需要进行序列化,并且显然AmazonS3Client不支持该代码。
如果我一起去,saveToTextFile我会面临类似的问题。当我进入foreachPartition循环时,我需要获取Iterable[T](在这种情况下p),因此,如果要使用它saveToTextFile,则需要创建Iterable的RDD,因此需要创建parallelize。问题在于,SparkContext sc也(理应如此)不会序列化。
rdd.foreachPartition { p =>
sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://")
}
任何帮助将不胜感激。
没有必要这样做。您可以只使用saveAsTextFilerdd:
rdd.saveAsTextFile(s"s3n://dir/to/aux/file")
Run Code Online (Sandbox Code Playgroud)
saveAsTextFile将在一个包含文件许多部分(与分区一样多的部分)的文件夹中写入S3。然后,您可以根据需要合并到单个文件:
def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
val hadoopConfig = sc.hadoopConfiguration
val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
}
mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
752 次 |
| 最近记录: |