sam*_*est 63 hadoop scala hdfs output apache-spark
如何在单个作业中使用Spark写入依赖于键的多个输出.
相关:通过键Scalding Hadoop写入多个输出,一个MapReduce作业
例如
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
Run Code Online (Sandbox Code Playgroud)
确保cat prefix/1是
a
b
Run Code Online (Sandbox Code Playgroud)
并cat prefix/2会
c
Run Code Online (Sandbox Code Playgroud)
编辑:我最近添加了一个新的答案,其中包括完整的导入,皮条客和压缩编解码器,请参阅/sf/answers/3228263111/,除了之前的答案,这可能会有所帮助.
Nic*_*mas 113
如果您使用Spark 1.4+,由于DataFrame API,这变得更加容易.(DataFrames是在Spark 1.3中引入的,但partitionBy()我们需要的是1.4中引入的.)
如果您刚开始使用RDD,则首先需要将其转换为DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
Run Code Online (Sandbox Code Playgroud)
在Python中,相同的代码是:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Run Code Online (Sandbox Code Playgroud)
拥有DataFrame后,基于特定键写入多个输出很简单.更重要的是 - 这就是DataFrame API的美妙之处 - Python,Scala,Java和R的代码几乎相同:
people_df.write.partitionBy("number").text("people")
Run Code Online (Sandbox Code Playgroud)
如果您需要,您可以轻松使用其他输出格式:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
Run Code Online (Sandbox Code Playgroud)
在每个示例中,Spark将为我们对DataFrame进行分区的每个键创建一个子目录:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Run Code Online (Sandbox Code Playgroud)
zha*_*han 81
我会这样做,这是可扩展的
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
刚看到上面的类似答案,但实际上我们不需要自定义分区.MultipleTextOutputFormat将为每个键创建文件.具有相同键的多个记录可以归入同一分区.
new HashPartitioner(num),其中num是您想要的分区号.如果您有大量不同的密钥,可以将数字设置为大.在这种情况下,每个分区都不会打开太多的hdfs文件处理程序.
Dan*_*bos 16
如果您可能为给定密钥设置了许多值,我认为可扩展的解决方案是为每个分区的每个密钥写出一个文件.不幸的是,在Spark中没有内置的支持,但是我们可以鞭打一些东西.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.mapPartitionsWithIndex { (p, it) =>
val outputs = new MultiWriter(p.toString)
for ((k, v) <- it) {
outputs.write(k.toString, v)
}
outputs.close
Nil.iterator
}
.foreach((x: Nothing) => ()) // To trigger the job.
// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
def write(key: String, value: Any) = {
if (!writers.contains(key)) {
val f = new java.io.File("output/" + key + "/" + suffix)
f.getParentFile.mkdirs
writers(key) = new java.io.PrintWriter(f)
}
writers(key).println(value)
}
def close = writers.values.foreach(_.close)
}
Run Code Online (Sandbox Code Playgroud)
(替换PrintWriter为您选择的分布式文件系统操作.)
这使得一次通过RDD并且不执行随机播放.它为每个键提供一个目录,每个目录中包含许多文件.
这包括请求的编解码器,必要的导入和请求的pimp.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Run Code Online (Sandbox Code Playgroud)
与OP的一个细微差别是它将<keyName>=为目录名称添加前缀.例如
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Run Code Online (Sandbox Code Playgroud)
会给:
prefix/key=1/part-00000
prefix/key=2/part-00000
Run Code Online (Sandbox Code Playgroud)
哪里prefix/my_number=1/part-00000将包含行a和b,prefix/my_number=2/part-00000并将包含该行c.
和
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Run Code Online (Sandbox Code Playgroud)
会给:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
Run Code Online (Sandbox Code Playgroud)
应该清楚如何编辑parquet.
最后下面是一个例子Dataset,使用元组可能更好.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
62632 次 |
| 最近记录: |