Spark知道DataFrame的分区键吗?

ast*_*asz 14 partitioning window-functions apache-spark

我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.

语境:

运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:

val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)

我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.

之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)

read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.

问题:

  1. Spark是否知道数据框df2是按列分区的numerocarte
  2. 如果它知道,那么窗口函数中将没有随机播放.真正?
  3. 如果它不知道,它将在窗口函数中进行随机播放.真正?
  4. 如果它不知道,我怎么告诉Spark数据已被右列分区?
  5. 如何查看分区键DataFrame?有这个命令吗?我知道如何检查分区数,但如何查看分区键?
  6. 当我在每个步骤之后打印文件中的分区数量时,我有42个分区read和200个分区,之后withColumn建议Spark重新分配我的DataFrame.
  7. 如果我有两个不同的表重新分配相同的列,联接会使用该信息吗?

hi-*_*zir 12

Spark是否知道数据帧df2是由列numerocarte分区的?

它不是.

如果它不知道,我怎么告诉Spark数据已被右列分区?

你没有.仅仅因为您保存了已经洗牌的数据,并不意味着它将加载相同的拆分.

如何检查DataFrame的分区键?

没有分区键,一旦你加载的数据,但你可以检查queryExecutionPartitioner.


在实践中:

  • 如果您想支持密钥的有效下推,请使用partitionBy方法DataFrameWriter.
  • 如果您希望对连接优化提供有限的支持,请使用bucketByMetastore和persistent table.

请参阅如何定义DataFrame的分区?详细的例子.


ast*_*asz 10

我正在回答自己的问题,以供将来参考。

按照@ user8371915的建议,bucketBy可以工作!

我正在保存我的DataFrame df

df.write
  .bucketBy(250, "userid")
  .saveAsTable("myNewTable")
Run Code Online (Sandbox Code Playgroud)

然后,当我需要加载该表时:

val df2 = spark.sql("SELECT * FROM myNewTable")

val w = Window.partitionBy("userid")

val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
Run Code Online (Sandbox Code Playgroud)

我确认当我对df2分区的窗口功能执行操作时,userid没有乱码!谢谢@ user8371915!

我在调查时学到的一些知识

  • myNewTable看起来像普通的实木复合地板文件,但事实并非如此。您可以使用正常阅读它,spark.read.format("parquet").load("path/to/myNewTable")但是以DataFrame这种方式创建的将不会保留原始分区!您必须使用spark.sql select以获得正确的分区DataFrame
  • 您可以使用来查看表格内部spark.sql("describe formatted myNewTable").collect.foreach(println)。这将告诉您哪些列用于存储分区以及有多少个存储分区。
  • 利用分区功能的窗口函数和联接通常也需要排序。您可以在写入时使用来对存储桶中的数据进行排序,.sortBy()并且排序也将保留在配置单元表中。df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
  • 在本地模式下工作时,表myNewTable将保存到spark-warehouse我的本地Scala SBT项目中的文件夹中。在通过mesos通过集群模式spark-submit保存时,它将保存到配置单元仓库。对我来说,它位于/user/hive/warehouse
  • 这样做时,spark-submit您需要添加SparkSession两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083").enableHiveSupport()。否则,您创建的配置单元表将不可见。
  • 如果要将表保存到特定数据库,请spark.sql("USE your database")在进行存储之前进行。

更新05-02-2018

我在进行火花存储和创建Hive表时遇到了一些问题。请参阅“ 为什么使用带有存储桶的Spark saveAsTable创建数以千计的文件?”中的问题,答复和评论