Spark RDD foreachPartition到S3

zax*_*xme 3 amazon-s3 apache-spark

我目前正在探索Spark。我面临以下任务-获取RDD,根据特定条件对其进行分区,然后将多个文件写入S3存储桶中的不同文件夹中。

一切正常,直到我们开始上传到S3部分。我在SO上阅读了所有与该问题有关的问题,发现我可以使用RDD AmazonS3Client或使用该saveToTextFile方法。我面临两个问题:

  1. 如果我选择“ AmazonS3Client我”,则java.io.NotSerializableException因为代码是从Spark驱动程序发送到工作程序的,因此需要进行序列化,并且显然AmazonS3Client不支持该代码。

  2. 如果我一起去,saveToTextFile我会面临类似的问题。当我进入foreachPartition循环时,我需要获取Iterable[T](在这种情况下p),因此,如果要使用它saveToTextFile,则需要创建Iterable的RDD,因此需要创建parallelize。问题在于,SparkContext sc也(理应如此)不会序列化。

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

任何帮助将不胜感激。

Mar*_*rco 5

没有必要这样做。您可以只使用saveAsTextFilerdd:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file")
Run Code Online (Sandbox Code Playgroud)

saveAsTextFile将在一个包含文件许多部分(与分区一样多的部分)的文件夹中写入S3。然后,您可以根据需要合并到单个文件:

  def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
    val hadoopConfig = sc.hadoopConfiguration
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
  }

  mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)
Run Code Online (Sandbox Code Playgroud)