ast*_*asz 14 partitioning window-functions apache-spark
我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.
语境:
运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)
我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.
之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)
在read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.
问题:
df2是按列分区的numerocarte?DataFrame?有这个命令吗?我知道如何检查分区数,但如何查看分区键?read和200个分区,之后withColumn建议Spark重新分配我的DataFrame.hi-*_*zir 12
Spark是否知道数据帧df2是由列numerocarte分区的?
它不是.
如果它不知道,我怎么告诉Spark数据已被右列分区?
你没有.仅仅因为您保存了已经洗牌的数据,并不意味着它将加载相同的拆分.
如何检查DataFrame的分区键?
没有分区键,一旦你加载的数据,但你可以检查queryExecution了Partitioner.
在实践中:
partitionBy方法DataFrameWriter.bucketByMetastore和persistent table.请参阅如何定义DataFrame的分区?详细的例子.
ast*_*asz 10
我正在回答自己的问题,以供将来参考。
按照@ user8371915的建议,bucketBy可以工作!
我正在保存我的DataFrame df:
df.write
.bucketBy(250, "userid")
.saveAsTable("myNewTable")
Run Code Online (Sandbox Code Playgroud)
然后,当我需要加载该表时:
val df2 = spark.sql("SELECT * FROM myNewTable")
val w = Window.partitionBy("userid")
val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
Run Code Online (Sandbox Code Playgroud)
我确认当我对df2分区的窗口功能执行操作时,userid没有乱码!谢谢@ user8371915!
我在调查时学到的一些知识
spark.read.format("parquet").load("path/to/myNewTable")但是以DataFrame这种方式创建的将不会保留原始分区!您必须使用spark.sql select以获得正确的分区DataFrame。spark.sql("describe formatted myNewTable").collect.foreach(println)。这将告诉您哪些列用于存储分区以及有多少个存储分区。.sortBy()并且排序也将保留在配置单元表中。df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")myNewTable将保存到spark-warehouse我的本地Scala SBT项目中的文件夹中。在通过mesos通过集群模式spark-submit保存时,它将保存到配置单元仓库。对我来说,它位于/user/hive/warehouse。spark-submit您需要添加SparkSession两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083")和.enableHiveSupport()。否则,您创建的配置单元表将不可见。spark.sql("USE your database")在进行存储之前进行。更新05-02-2018
我在进行火花存储和创建Hive表时遇到了一些问题。请参阅“ 为什么使用带有存储桶的Spark saveAsTable创建数以千计的文件?”中的问题,答复和评论。
| 归档时间: |
|
| 查看次数: |
11381 次 |
| 最近记录: |