通过键Spark写入多个输出 - 一个Spark作业

sam*_*est 63 hadoop scala hdfs output apache-spark

如何在单个作业中使用Spark写入依赖于键的多个输出.

相关:通过键Scalding Hadoop写入多个输出,一个MapReduce作业

例如

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
Run Code Online (Sandbox Code Playgroud)

确保cat prefix/1

a
b
Run Code Online (Sandbox Code Playgroud)

cat prefix/2

c
Run Code Online (Sandbox Code Playgroud)

编辑:我最近添加了一个新的答案,其中包括完整的导入,皮条客和压缩编解码器,请参阅/sf/answers/3228263111/,除了之前的答案,这可能会有所帮助.

Nic*_*mas 113

如果您使用Spark 1.4+,由于DataFrame API,这变得更加容易.(DataFrames是在Spark 1.3中引入的,但partitionBy()我们需要的是1.4引入的.)

如果您刚开始使用RDD,则首先需要将其转换为DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
Run Code Online (Sandbox Code Playgroud)

在Python中,相同的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Run Code Online (Sandbox Code Playgroud)

拥有DataFrame后,基于特定键写入多个输出很简单.更重要的是 - 这就是DataFrame API的美妙之处 - Python,Scala,Java和R的代码几乎相同:

people_df.write.partitionBy("number").text("people")
Run Code Online (Sandbox Code Playgroud)

如果您需要,您可以轻松使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
Run Code Online (Sandbox Code Playgroud)

在每个示例中,Spark将为我们对DataFrame进行分区的每个键创建一个子目录:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
Run Code Online (Sandbox Code Playgroud)

  • @samthebest - 仅供参考,我回滚你的编辑,因为它有一些问题:它不符合我的写作风格; 我对数据集知之甚少,所以关于`Dataset [SomeCaseClass]`的注释更适合作为注释; 最后,Python没有`makeRDD()`方法. (74认同)
  • 注意,如果你有`Dataset [SomeCaseClass]`那么你可以调用`.toDF()`,列标签将匹配`SomeCaseClass`es字段.这提供了更多的类型安全性. (9认同)
  • @moustachio - 好问题.我想你可以通过在`partitionBy()`之前将DataFrame合并到一个分区来强制这样做.例如:`people_df.coalesce(1).write.partitionBy("number").text("people")`这可能会限制Spark在写出数据时的并行性,具体取决于您的数据和群集配置. (3认同)

zha*_*han 81

我会这样做,这是可扩展的

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}
Run Code Online (Sandbox Code Playgroud)

刚看到上面的类似答案,但实际上我们不需要自定义分区.MultipleTextOutputFormat将为每个键创建文件.具有相同键的多个记录可以归入同一分区.

new HashPartitioner(num),其中num是您想要的分区号.如果您有大量不同的密钥,可以将数字设置为大.在这种情况下,每个分区都不会打开太多的hdfs文件处理程序.

  • 那不一样.散列分区将确保具有相同密钥的所有记录转到同一分区.我记得重新分区没有这个功能. (4认同)
  • 我正在寻找编写多个镶木地板输出,这个沿着这些线的解决方案看起来很有希望(只是直接子类化MultipleOutputFormat,而不是使用MultipleTextOutputFormat).不幸的是,MutlipleOutputFormat只存在于旧的API MR1/mapred中,而AvroParquetOutputFormat和ParquetOutputFormat(支持镶木地板)是针对新的API MR2/mapreduce编写的,所以看起来相同的路径是不开放的...... (3认同)
  • 看起来很棒!是否有python等价物? (3认同)
  • 请问你能添加所有必要的进口报表吗?我没有对此进行测试,但接受答案似乎是我想要的.`partitionBy(new Hashpartitioner(num))`有什么意义?这与`repartition(num)`不一样吗? (2认同)

Dan*_*bos 16

如果您可能为给定密钥设置了许多值,我认为可扩展的解决方案是为每个分区的每个密钥写出一个文件.不幸的是,在Spark中没有内置的支持,但是我们可以鞭打一些东西.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}
Run Code Online (Sandbox Code Playgroud)

(替换PrintWriter为您选择的分布式文件系统操作.)

这使得一次通过RDD并且不执行随机播放.它为每个键提供一个目录,每个目录中包含许多文件.


sam*_*est 7

这包括请求的编解码器,必要的导入和请求的pimp.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Run Code Online (Sandbox Code Playgroud)

与OP的一个细微差别是它将<keyName>=为目录名称添加前缀.例如

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Run Code Online (Sandbox Code Playgroud)

会给:

prefix/key=1/part-00000
prefix/key=2/part-00000
Run Code Online (Sandbox Code Playgroud)

哪里prefix/my_number=1/part-00000将包含行ab,prefix/my_number=2/part-00000并将包含该行c.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Run Code Online (Sandbox Code Playgroud)

会给:

prefix/foo=1/part-00000
prefix/foo=2/part-00000
Run Code Online (Sandbox Code Playgroud)

应该清楚如何编辑parquet.

最后下面是一个例子Dataset,使用元组可能更好.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @Aliostad,看看日期,这是一年半后发布的。此外,在 SO 上_不_习惯(有时被认为是粗鲁的)发布您自己问题的答案(在它已经有一个或多个有效问题之后)并接受它。有时,一种情况需要多个答案,但是您通常会保留接受原始答案(除非结果证明是错误的,或者来自其他用户的新答案要好得多,但这里的情况并非如此,OP 清楚地考虑了原始答案正确)。我只能假设 OP 不知道这种情况下的准则。 (3认同)
  • 最上面的答案实际上是最好的,看来你基本上复制了他的答案。 (3认同)
  • @Abel我知道准则,但是我觉得有必要发布自己的答案,因为我的答案“比这里要好得多”,因为这是唯一的答案:1.包括如何指定压缩编解码器(如在OP中要求); 2。包括如何将其添加为pimp / extension方法(在OP中要求); 3。实际编译!(包括必要的导入),4.使用正确的Scala样式和格式。令人遗憾的是,快到2019年了,并不是每个人都可以编写可编译的代码,也不是正确的样式。 (2认同)