Spark 按列重新分区,每列具有动态分区数

Pow*_*ers 6 apache-spark

如何根据列中的项目数对 DataFrame 进行分区。假设我们有一个包含 100 人的 DataFrame(列是first_namecountry),我们想为一个国家的每 10 人创建一个分区。

如果我们的数据集包含来自中国的 80 人、来自法国的 15 人和来自古巴的 5 人,那么我们将需要 8 个中国分区、2 个法国分区和 1 个古巴分区。

这是不起作用的代码:

  • df.repartition($"country"): 这将为中国创建 1 个分区,为法国创建一个分区,为古巴创建一个分区
  • df.repartition(8, $"country", rand):这会为每个国家创建最多 8 个分区,因此应该为中国创建 8 个分区,但法国和古巴分区未知。法国可能在 8 个分区中,而古巴最多可能在 5 个分区中。有关更多详细信息,请参阅此答案

这是repartition()文档:

重新分区文件

当我查看该repartition()方法时,我什至没有看到采用三个参数的方法,因此看起来其中一些行为没有记录。

有没有办法动态设置每列的分区数?这将使创建分区数据集变得更容易。

And*_*ong 5

由于 spark 对数据进行分区的方式,您将无法完全做到这一点。Spark 获取您在重新分区中指定的列,将该值散列到 64b 长的长度中,然后将该值乘以分区数。这样分区的数量是确定性的。它以这种方式工作的原因是,除了确保两侧的散列相同之外,连接还需要匹配连接左侧和右侧的分区数。

“我们希望为一个国家的每 10 个人创建一个分区。”

你到底想在这里完成什么?一个分区中只有 10 行对性能来说可能很糟糕。您是否正在尝试创建一个分区表,其中保证分区中的每个文件只有 x 行?

"df.repartition($"country"): 这将为中国创建 1 个分区,为法国创建一个分区,为古巴创建一个分区”

这实际上将创建一个数据框,其中包含按国家/地区散列的默认随机分区数

  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }
Run Code Online (Sandbox Code Playgroud)

"df.repartition(8, $"country", rand):这会为每个国家创建最多8个分区,所以应该为中国创建8个分区,但法国和古巴分区未知。法国可能在8个分区中古巴最多可以分为 5 个分区。有关更多详细信息,请参阅此答案。”

像明智的那样,这是微妙的错误。只有 8 个分区,国家在这 8 个分区中基本上是随机洗牌的。


Pow*_*ers 5

以下代码将为每个数据文件创建十行(示例数据集位于此处):

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)
Run Code Online (Sandbox Code Playgroud)

以下是 Spark 2.2 之前的代码,它将为每个数据文件创建大约十行:

val desiredRowsPerPartition = 10

val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )

val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)
Run Code Online (Sandbox Code Playgroud)