zax*_*xme 3 amazon-s3 apache-spark
我目前正在探索Spark。我面临以下任务-获取RDD,根据特定条件对其进行分区,然后将多个文件写入S3存储桶中的不同文件夹中。
一切正常,直到我们开始上传到S3部分。我在SO上阅读了所有与该问题有关的问题,发现我可以使用RDD AmazonS3Client
或使用该saveToTextFile
方法。我面临两个问题:
如果我选择“ AmazonS3Client
我”,则java.io.NotSerializableException
因为代码是从Spark驱动程序发送到工作程序的,因此需要进行序列化,并且显然AmazonS3Client不支持该代码。
如果我一起去,saveToTextFile
我会面临类似的问题。当我进入foreachPartition
循环时,我需要获取Iterable[T]
(在这种情况下p
),因此,如果要使用它saveToTextFile
,则需要创建Iterable的RDD,因此需要创建parallelize
。问题在于,SparkContext sc
也(理应如此)不会序列化。
rdd.foreachPartition { p =>
sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://")
}
任何帮助将不胜感激。
没有必要这样做。您可以只使用saveAsTextFile
rdd:
rdd.saveAsTextFile(s"s3n://dir/to/aux/file")
Run Code Online (Sandbox Code Playgroud)
saveAsTextFile
将在一个包含文件许多部分(与分区一样多的部分)的文件夹中写入S3。然后,您可以根据需要合并到单个文件:
def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
val hadoopConfig = sc.hadoopConfiguration
val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
}
mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
752 次 |
最近记录: |