sat*_*kum 4 scala amazon-s3 dataframe apache-spark qubole
我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和Scala,但可以用Python改为Spark.欢迎任何建议.
DataFrame输入:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
Run Code Online (Sandbox Code Playgroud)
期望的输出:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
Run Code Online (Sandbox Code Playgroud)
这是我尝试过的现有Spark Scala代码:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
Run Code Online (Sandbox Code Playgroud)
电流输出:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
Run Code Online (Sandbox Code Playgroud)
我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count/sum/agg函数.我正在寻找一种更好的技术来分组和输出数据.数据集非常大.
这可以使用partitionBy
选项来实现DataFrameWriter
.一般语法如下:
df.write.partitionBy("name", "animal").format(...).save(...)
Run Code Online (Sandbox Code Playgroud)
不幸的是,支持Spark 1.5中分区的唯一纯文本格式是JSON.
如果您可以将Spark安装更新为:
partitionBy
与text
格式.如果您需要group(repartition
)的单个输出文件,则还需要1.6 .partitionBy
与csv
格式.我相信在1.5中你最好的选择是将文件写为JSON并转换单个输出文件.
如果distinct的name', 'animals
数量很小,您可以尝试为每个组执行单独的写入:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
Run Code Online (Sandbox Code Playgroud)
但是当组合数量增加时,这不会扩展.
归档时间: |
|
查看次数: |
1545 次 |
最近记录: |