Pyspark:如何对倾斜聚合使用盐化技术

Pra*_*rma 1 salt pyspark

如何在 Pyspark 中使用盐化技术进行倾斜聚合。

假设我们有倾斜的数据,如下所示,如何创建盐列并在聚合中使用它。

城市 状态 数数
拉琼 锡金 3,000
让波 锡金 50,000
甘托克 锡金 3,00,000
班加罗尔 卡纳塔克邦 2,50,00,000
孟买 马哈拉施特拉邦 2,90,00,000

Pra*_*rma 5

要对倾斜数据使用加盐技术,我们需要创建一个名为“盐”的列。生成一个范围从 0 到 (spark.sql.shuffle.partitions - 1) 的随机编号。

表应如下所示,其中“salt”列的值从 0 到 199(在本例中分区大小为 200)。现在您可以对“城市”、“州”、“盐”使用 groupBy。

城市 状态
拉琼 锡金 151
拉琼 锡金 102
拉琼 锡金 16
让波 锡金 5
让波 锡金 19
让波 锡金 16
让波 锡金 102
甘托克 锡金 55
甘托克 锡金 119
甘托克 锡金 16
甘托克 锡金 10
班加罗尔 卡纳塔克邦 19
孟买 马哈拉施特拉邦 0
班加罗尔 卡纳塔克邦 199
孟买 马哈拉施特拉邦 190

代码:

from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import (
    StructType, StructField, IntegerType
)

salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 )

record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\
    .groupBy("city", "state", "salt")\
    .agg(
      f.count("city")
    )\
    .drop("salt")
Run Code Online (Sandbox Code Playgroud)

输出:

城市 状态 数数
拉琼 锡金 3,000
让波 锡金 50,000
甘托克 锡金 3,00,000
班加罗尔 卡纳塔克邦 2,50,00,000
孟买 马哈拉施特拉邦 2,90,00,000