如何在 Pyspark 中使用盐化技术进行倾斜聚合。
假设我们有倾斜的数据,如下所示,如何创建盐列并在聚合中使用它。
| 城市 | 状态 | 数数 |
|---|---|---|
| 拉琼 | 锡金 | 3,000 |
| 让波 | 锡金 | 50,000 |
| 甘托克 | 锡金 | 3,00,000 |
| 班加罗尔 | 卡纳塔克邦 | 2,50,00,000 |
| 孟买 | 马哈拉施特拉邦 | 2,90,00,000 |
要对倾斜数据使用加盐技术,我们需要创建一个名为“盐”的列。生成一个范围从 0 到 (spark.sql.shuffle.partitions - 1) 的随机编号。
表应如下所示,其中“salt”列的值从 0 到 199(在本例中分区大小为 200)。现在您可以对“城市”、“州”、“盐”使用 groupBy。
| 城市 | 状态 | 盐 |
|---|---|---|
| 拉琼 | 锡金 | 151 |
| 拉琼 | 锡金 | 102 |
| 拉琼 | 锡金 | 16 |
| 让波 | 锡金 | 5 |
| 让波 | 锡金 | 19 |
| 让波 | 锡金 | 16 |
| 让波 | 锡金 | 102 |
| 甘托克 | 锡金 | 55 |
| 甘托克 | 锡金 | 119 |
| 甘托克 | 锡金 | 16 |
| 甘托克 | 锡金 | 10 |
| 班加罗尔 | 卡纳塔克邦 | 19 |
| 孟买 | 马哈拉施特拉邦 | 0 |
| 班加罗尔 | 卡纳塔克邦 | 199 |
| 孟买 | 马哈拉施特拉邦 | 190 |
代码:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import (
StructType, StructField, IntegerType
)
salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 )
record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\
.groupBy("city", "state", "salt")\
.agg(
f.count("city")
)\
.drop("salt")
Run Code Online (Sandbox Code Playgroud)
输出:
| 城市 | 状态 | 数数 |
|---|---|---|
| 拉琼 | 锡金 | 3,000 |
| 让波 | 锡金 | 50,000 |
| 甘托克 | 锡金 | 3,00,000 |
| 班加罗尔 | 卡纳塔克邦 | 2,50,00,000 |
| 孟买 | 马哈拉施特拉邦 | 2,90,00,000 |
| 归档时间: |
|
| 查看次数: |
4065 次 |
| 最近记录: |