如何根据列中的项目数对 DataFrame 进行分区。假设我们有一个包含 100 人的 DataFrame(列是first_name和country),我们想为一个国家的每 10 人创建一个分区。
如果我们的数据集包含来自中国的 80 人、来自法国的 15 人和来自古巴的 5 人,那么我们将需要 8 个中国分区、2 个法国分区和 1 个古巴分区。
这是不起作用的代码:
df.repartition($"country"): 这将为中国创建 1 个分区,为法国创建一个分区,为古巴创建一个分区df.repartition(8, $"country", rand):这会为每个国家创建最多 8 个分区,因此应该为中国创建 8 个分区,但法国和古巴分区未知。法国可能在 8 个分区中,而古巴最多可能在 5 个分区中。有关更多详细信息,请参阅此答案。这是repartition()文档:
当我查看该repartition()方法时,我什至没有看到采用三个参数的方法,因此看起来其中一些行为没有记录。
有没有办法动态设置每列的分区数?这将使创建分区数据集变得更容易。
由于 spark 对数据进行分区的方式,您将无法完全做到这一点。Spark 获取您在重新分区中指定的列,将该值散列到 64b 长的长度中,然后将该值乘以分区数。这样分区的数量是确定性的。它以这种方式工作的原因是,除了确保两侧的散列相同之外,连接还需要匹配连接左侧和右侧的分区数。
“我们希望为一个国家的每 10 个人创建一个分区。”
你到底想在这里完成什么?一个分区中只有 10 行对性能来说可能很糟糕。您是否正在尝试创建一个分区表,其中保证分区中的每个文件只有 x 行?
"df.repartition($"country"): 这将为中国创建 1 个分区,为法国创建一个分区,为古巴创建一个分区”
这实际上将创建一个数据框,其中包含按国家/地区散列的默认随机分区数
def repartition(partitionExprs: Column*): Dataset[T] = {
repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
}
Run Code Online (Sandbox Code Playgroud)
"df.repartition(8, $"country", rand):这会为每个国家创建最多8个分区,所以应该为中国创建8个分区,但法国和古巴分区未知。法国可能在8个分区中古巴最多可以分为 5 个分区。有关更多详细信息,请参阅此答案。”
像明智的那样,这是微妙的错误。只有 8 个分区,国家在这 8 个分区中基本上是随机洗牌的。
以下代码将为每个数据文件创建十行(示例数据集位于此处):
val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
.repartition(col("person_country"))
.write
.option("maxRecordsPerFile", 10)
.partitionBy("person_country")
.csv(outputPath)
Run Code Online (Sandbox Code Playgroud)
以下是 Spark 2.2 之前的代码,它将为每个数据文件创建大约十行:
val desiredRowsPerPartition = 10
val joinedDF = df
.join(countDF, Seq("person_country"))
.withColumn(
"my_secret_partition_key",
(rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
)
val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
.repartition(col("person_country"), col("my_secret_partition_key"))
.drop("count", "my_secret_partition_key")
.write
.partitionBy("person_country")
.csv(outputPath)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15223 次 |
| 最近记录: |