PySpark,DataFrame的顶部

Jin*_*ing 4 dataframe apache-spark pyspark spark-dataframe

我想要做的是给出一个DataFrame,根据一些指定的列获取前n个元素.RDD API中的top(self,num)正是我想要的.我想知道DataFrame世界中是否有相同的API?

我的第一次尝试如下

def retrieve_top_n(df, n):
    # assume we want to get most popular n 'key' in DataFrame
    return df.groupBy('key').count().orderBy('count', ascending=False).limit(n).select('key')
Run Code Online (Sandbox Code Playgroud)

但是,我已经意识到这会导致非确定性行为(我不知道确切的原因,但我认为limit(n)不能保证采取哪个n)

Ped*_*rte 7

首先让我们定义一个生成测试数据的函数:

import numpy as np

def sample_df(num_records):
    def data():
      np.random.seed(42)
      while True:
          yield int(np.random.normal(100., 80.))

    data_iter = iter(data())
    df = sc.parallelize((
        (i, next(data_iter)) for i in range(int(num_records))
    )).toDF(('index', 'key_col'))

    return df

sample_df(1e3).show(n=5)
+-----+-------+
|index|key_col|
+-----+-------+
|    0|    139|
|    1|     88|
|    2|    151|
|    3|    221|
|    4|     81|
+-----+-------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)


现在,让我们提出三种不同的方法来计算TopK:

from pyspark.sql import Window
from pyspark.sql import functions


def top_df_0(df, key_col, K):
    """
    Using window functions.  Handles ties OK.
    """
    window = Window.orderBy(functions.col(key_col).desc())
    return (df
            .withColumn("rank", functions.rank().over(window))
            .filter(functions.col('rank') <= K)
            .drop('rank'))


def top_df_1(df, key_col, K):
    """
    Using limit(K). Does NOT handle ties appropriately.
    """
    return df.orderBy(functions.col(key_col).desc()).limit(K)


def top_df_2(df, key_col, K):
    """
    Using limit(k) and then filtering.  Handles ties OK."
    """
    num_records = df.count()
    value_at_k_rank = (df
                       .orderBy(functions.col(key_col).desc())
                       .limit(k)
                       .select(functions.min(key_col).alias('min'))
                       .first()['min'])

    return df.filter(df[key_col] >= value_at_k_rank)
Run Code Online (Sandbox Code Playgroud)

调用的函数top_df_1类似于您最初实现的函数.它给你非确定性行为的原因是因为它无法很好地处理关系.如果您有大量数据并且仅为了性能而对大致答案感兴趣,那么这可能是一件好事.


最后,让我们进行基准测试

对于基准测试,使用具有400万个条目的Spark DF并定义便利功能:

NUM_RECORDS = 4e6
test_df = sample_df(NUM_RECORDS).cache()

def show(func, df, key_col, K):
    func(df, key_col, K).select(
      functions.max(key_col),
      functions.min(key_col),
      functions.count(key_col)
    ).show()
Run Code Online (Sandbox Code Playgroud)


让我们看看判决结果:

%timeit show(top_df_0, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+

1 loops, best of 3: 1.62 s per loop


%timeit show(top_df_1, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           100|
+------------+------------+--------------+

1 loops, best of 3: 252 ms per loop


%timeit show(top_df_2, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+

1 loops, best of 3: 725 ms per loop
Run Code Online (Sandbox Code Playgroud)

(请注意,top_df_0top_df_2具有在顶部100 108个条目这是由于绑条目的存在下的第100最好的,top_df_1是忽略了并列条目实现.).


底线

如果你想要一个确切的答案top_df_2(它比约好2倍top_df_0).如果你想要另外的x2性能,并且可以使用大致的答案top_df_1.