shu*_*put 8 scala apache-spark parquet apache-spark-sql spark-dataframe
我有一个生成的DataFrame,如下所示:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
我想根据每个独特的价值作出新的dataframes col("Hour"),即
因此,所需的输出将是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
同样
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
非常感谢您的帮助。
编辑1:
我尝试过的
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
Run Code Online (Sandbox Code Playgroud)
此策略的问题:
当它在df具有超过一百万行的数据帧上运行时,花了8个小时,并且在单个节点上为大约10 GB的RAM提供了火花作业。因此,join事实证明效率很低。
注意事项:我必须将每个数据帧编写mydf为镶木地板,该镶木具有需要维护(而不是扁平化)的嵌套架构。
如我的评论中所述,解决该问题的一种可能的简便方法是使用:
df.write.partitionBy("hour").saveAsTable("myparquet")
Run Code Online (Sandbox Code Playgroud)
如前所述,文件夹结构将是myparquet/hour=1,myparquet/hour=2......,myparquet/hour=24而不是myparquet/1,myparquet/2......, myparquet/24。
要更改文件夹结构,您可以
hcat.dynamic.partitioning.custom.pattern在显式的HiveContext中使用Hive配置设置;更多信息,请参见HCatalog DynamicPartitions。df.write.partitionBy.saveAsTable(...)命令后直接更改文件系统,例如从文件夹名称中for f in *; do mv $f ${f/${f:0:5}/} ; done删除Hour=文本。 重要的是要注意,通过更改文件夹的命名模式,当您spark.read.parquet(...)在该文件夹中运行时,Spark不会自动了解动态分区,因为它缺少partitionKey(即Hour)信息。
| 归档时间: |
|
| 查看次数: |
19722 次 |
| 最近记录: |