使用窗口函数的 PySpark 数据偏度

Sre*_* TP 6 apache-spark pyspark

我有一个巨大的 PySpark 数据框,我正在通过我的键定义的分区执行一系列 Window 函数。

关键的问题是,我的分区因此而倾斜,导致事件时间线看起来像这样,

在此处输入图片说明

我知道我可以在加入时使用加盐技术来解决这个问题。但是当我使用 Window 函数时如何解决这个问题呢?

我在窗口函数中使用了滞后、领先等函数。我不能用加盐键来完成这个过程,因为我会得到错误的结果。

在这种情况下如何解决偏度?

我正在寻找一种动态方式来重新分区我的数据框而不会出现偏斜。

根据@jxc 的回答更新

我尝试创建一个示例 df 并尝试在其上运行代码,

df = pd.DataFrame()
df['id'] = np.random.randint(1, 1000, size=150000)
df['id'] = df['id'].map(lambda x: 100 if x % 2 == 0 else x)
df['timestamp'] = pd.date_range(start=pd.Timestamp('2020-01-01'), periods=len(df), freq='60s')
sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)
w = Window.partitionBy("id").orderBy("timestamp")

sdf = sdf.withColumn("new_col", F.lag("amt").over(w) + F.lead("amt").over(w))
x = sdf.toPandas()
Run Code Online (Sandbox Code Playgroud)

这给了我这样的事件时间表,

在此处输入图片说明

我尝试了@jxc 答案中的代码,

sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)

N = 24*3600*365*2
sdf_1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))

w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
w2 = Window.partitionBy('id', 'pid')

sdf_2 = sdf_1.select(
    '*',
    F.count('*').over(w2).alias('cnt'),
    F.row_number().over(w1).alias('rn'),
    (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_val')
)

sdf_3 = sdf_2.filter('rn in (1, 2, cnt-1, cnt)') \
    .withColumn('new_val', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
    .filter('rn in (1,cnt)')
    
df_new = sdf_2.filter('rn not in (1,cnt)').union(sdf_3)

x = df_new.toPandas()
Run Code Online (Sandbox Code Playgroud)

我结束了一个额外的阶段,事件时间线看起来更倾斜,

在此处输入图片说明

新代码也增加了运行时间

jxc*_*jxc 10

要处理大分区,您可以尝试根据orderBy列(很可能是数字列或可以转换为数字的日期/时间戳列)对其进行拆分,以便所有新的子分区保持正确的行顺序。使用新分区器处理行,并且为了使用laglead函数进行计算,仅需要对子分区之间边界周围的行进行后处理。(下面也讨论了task-2中如何合并小分区)

使用您的示例sdf并假设我们有以下 WinSpec 和一个简单的聚合函数:

w = Window.partitionBy('id').orderBy('timestamp')
df.withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w))
Run Code Online (Sandbox Code Playgroud)

任务1:分割大分区:

请尝试以下操作:

  1. 选择N来分割时间戳并设置额外的分区By 列pid(使用ceilintfloor):

    # N to cover 35-days' intervals
    N = 24*3600*35
    df1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))
    
    Run Code Online (Sandbox Code Playgroud)
  2. pid添加到partitionBy(参见w1),然后添加 calaulte row_number()lag()最后lead()添加到w1。还可以找到每个新分区中的行数 ( cnt ),以帮助识别分区的末尾 ( rn == cnt)。生成的new_val对于大多数行来说都很好,除了每个分区边界上的行。

    w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
    w2 = Window.partitionBy('id', 'pid')
    
    df2 = df1.select(
        '*',
        F.count('*').over(w2).alias('cnt'),
        F.row_number().over(w1).alias('rn'),
        (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_amt')
    )
    
    Run Code Online (Sandbox Code Playgroud)

    df2下面是显示边界行的示例。

    在此输入图像描述

  3. 处理边界:选择边界上的行rn in (1, cnt)加上计算中使用的值的行,对w进行相同的new_valrn in (2, cnt-1)计算,并仅保存边界行的结果。

    df3 = df2.filter('rn in (1, 2, cnt-1, cnt)') \
        .withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
        .filter('rn in (1,cnt)')
    
    Run Code Online (Sandbox Code Playgroud)

    下面显示了上面df2的结果df3

    在此输入图像描述

  4. 将df3合并回df2以更新边界行rn in (1,cnt)

    df_new = df2.filter('rn not in (1,cnt)').union(df3)
    
    Run Code Online (Sandbox Code Playgroud)

    下面的屏幕截图显示了边界行周围的最终df_new :

    在此输入图像描述

    # drop columns which are used to implement logic only
    df_new = df_new.drop('cnt', 'rn')
    
    Run Code Online (Sandbox Code Playgroud)

一些注意事项:

  1. 定义了以下3个WindowSpec:

    w = Window.partitionBy('id').orderBy('timestamp')          <-- fix boundary rows
    w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')  <-- calculate internal rows
    w2 = Window.partitionBy('id', 'pid')                       <-- find #rows in a partition
    
    Run Code Online (Sandbox Code Playgroud)

    注意:严格来说,我们最好使用以下内容来修复边界行,以避免边界周围w出现问题。timestamp

    w = Window.partitionBy('id').orderBy('pid', 'rn')          <-- fix boundary rows
    
    Run Code Online (Sandbox Code Playgroud)
  2. 如果您知道哪些分区是倾斜的,只需划分它们并跳过其他分区即可。现有的方法可能会将一个小分区分成 2 个甚至更多(如果它们分布稀疏)

    df1 = df.withColumn('pid', F.when(F.col('id').isin('a','b'), F.ceil(F.unix_timestamp('timestamp')/N)).otherwise(1))
    
    Run Code Online (Sandbox Code Playgroud)

    如果对于每个分区,您可以检索count(行数) 和min_ts=min(时间戳),然后尝试更动态的操作pid(以下M是要拆分的行数阈值):

    F.expr(f"IF(count>{M}, ceil((unix_timestamp(timestamp)-unix_timestamp(min_ts))/{N}), 1)")
    
    Run Code Online (Sandbox Code Playgroud)

    注意:对于分区内的偏度,将需要更复杂的函数来生成pid.

  3. 如果仅lag(1)使用函数,则仅对左边界进行后处理、过滤rn in (1, cnt)并仅更新rn == 1

    df3 = df1.filter('rn in (1, cnt)') \
        .withColumn('new_amt', F.lag('amt',1).over(w)) \
        .filter('rn = 1')
    
    Run Code Online (Sandbox Code Playgroud)

    类似于引导函数,当我们只需要修复右边界并更新时rn == cnt

  4. 如果仅lag(2)使用,则使用以下命令过滤并更新更多行df3

    df3 = df1.filter('rn in (1, 2, cnt-1, cnt)') \
        .withColumn('new_amt', F.lag('amt',2).over(w)) \
        .filter('rn in (1,2)')
    
    Run Code Online (Sandbox Code Playgroud)

    您可以将相同的方法扩展到具有不同偏移量的混合lag情况lead

任务2:合并小分区:

根据分区 中的记录数count,我们可以设置一个阈值M,以便如果count>M,则id拥有自己的分区,否则我们合并分区,使 #of 总记录数小于M(下面的方法有一个边缘情况2*M-2)。

M = 20000

# create pandas df with columns `id`, `count` and `f`, sort rows so that rows with count>=M are located on top
d2 = pd.DataFrame([ e.asDict() for e in sdf.groupby('id').count().collect() ]) \
    .assign(f=lambda x: x['count'].lt(M)) \
    .sort_values('f')    

# add pid column to merge smaller partitions but the total row-count in partition should be less than or around M 
# potentially there could be at most `2*M-2` records for the same pid, to make sure strictly count<M, use a for-loop to iterate d1 and set pid:
d2['pid'] = (d2.mask(d2['count'].gt(M),M)['count'].shift(fill_value=0).cumsum()/M).astype(int)

# add pid to sdf. In case join is too heavy, try using Map
sdf_1 = sdf.join(spark.createDataFrame(d2).alias('d2'), ["id"]) \
    .select(sdf["*"], F.col("d2.pid"))

# check pid: # of records and # of distinct ids
sdf_1.groupby('pid').agg(F.count('*').alias('count'), F.countDistinct('id').alias('cnt_ids')).orderBy('pid').show()
+---+-----+-------+                                                             
|pid|count|cnt_ids|
+---+-----+-------+
|  0|74837|      1|
|  1|20036|    133|
|  2|20052|    134|
|  3|20010|    133|
|  4|15065|    100|
+---+-----+-------+
Run Code Online (Sandbox Code Playgroud)

现在,新的窗口应该单独按pid进行分区,并将id移动到orderBy,如下所示:

w3 = Window.partitionBy('pid').orderBy('id','timestamp')
Run Code Online (Sandbox Code Playgroud)

根据上述w3 WinSpec 定制 lag/lead 函数,然后计算new_val

lag_w3  = lambda col,n=1: F.when(F.lag('id',n).over(w3) == F.col('id'), F.lag(col,n).over(w3))
lead_w3 = lambda col,n=1: F.when(F.lead('id',n).over(w3) == F.col('id'), F.lead(col,n).over(w3))

sdf_new = sdf_1.withColumn('new_val', lag_w3('amt',1) + lead_w3('amt',1))
Run Code Online (Sandbox Code Playgroud)