在pyspark中高效地以分布式方式生成大型DataFrame(无需pyspark.sql.Row)

Ale*_*rov 5 apache-spark pyspark apache-arrow pyarrow

问题归结为以下几点:我想使用现有的并行输入集合和一个函数在 pyspark 中生成一个 DataFrame,该函数给定一个输入可以生成一批相对较大的行。在下面的示例中,我想使用 1000 个执行器生成 10^12 行数据帧:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)
Run Code Online (Sandbox Code Playgroud)

(我真的不想研究给定种子的随机数的分布 - 这只是我能够想出的一个例子来说明大型数据帧不是从仓库加载而是由代码生成时的情况)

上面的代码几乎完全符合我的要求。问题是它以非常低效的方式完成此操作 - 代价是为每行创建一个 python Row 对象,然后将 python Row 对象转换为内部 Spark 柱状表示。

有没有一种方法可以np_array通过让 Spark 知道这些是一批值的列来转换已经以柱状表示形式存在的一批行(例如上面的一个或几个 numpy 数组)?

例如,我可以编写代码来生成 python 集合 RDD,其中每个元素都是 pyarrow.RecordBatch 或 pandas.DataFrame,但我找不到将其中任何一个转换为 Spark DataFrame 的方法,而无需在其中创建 pyspark Row 对象的 RDD的过程。

至少有十几篇文章举例说明了如何使用 pyarrow + pandas 将本地(到驱动程序)pandas 数据帧有效地转换为 Spark 数据帧,但这对我来说不是一个选择,因为我需要在在执行器上采用分布式方式,而不是在驱动器上生成一个 pandas 数据帧并将其发送给执行器。

UPD。 我找到了一种避免创建 Row 对象的方法 - 使用 python 元组的 RDD。正如预期的那样,它仍然太慢,但仍然比使用 Row 对象快一点。不过,这并不是我真正想要的(这是将列式数据从 python 传递到 Spark 的一种非常有效的方法)。

还测量了在机器上执行某些操作的时间(粗略的方法,测量时间有很大的变化,但在我看来仍然具有代表性):所讨论的数据集是 10M 行,3 列(一列是常量整数,其他列是是从 0 到 10M-1 的整数范围,第三个是使用以下命令生成的浮点值np.random.random_sample

  • 本地生成 pandas 数据帧(10M 行):~440-450ms
  • 本地生成spark.sql.Row对象的python列表(10M行):~12-15s
  • 本地生成代表行的元组的 python 列表(10M 行):~3.4-3.5s

仅使用 1 个执行器和 1 个初始种子值生成 Spark 数据帧:

  • 使用spark.createDataFrame(row_rdd, schema=my_schema):〜70-80秒
  • 使用spark.createDataFrame(tuple_rdd, schema=my_schema):~40-45秒
  • (非分布式创建)使用spark.createDataFrame(pandas_df, schema=my_schema):~0.4-0.5s(没有 pandas df 生成本身,大约需要相同的时间)- 设置spark.sql.execution.arrow.enabled为 true。

本地驱动程序 pandas 数据帧在约 1 秒内转换为 Spark 数据帧(10M 行)的示例让我有理由相信,在执行程序中生成的数据帧应该可以实现同样的效果。然而,使用 python 元组的 RDD,我现在能达到的最快速度是大约 40 秒(对于 10M 行)。

所以问题仍然存在 - 有没有一种方法可以在 pyspark 中以分布式方式有效地生成大型 Spark 数据帧?

ayp*_*lam 3

听起来瓶颈是从 RDD -> Dataframes 的转换,并且手头的函数相当快,并且 pandas DF 通过 pyarrow 到 Spark DF 的转换非常快。以下是两个可能的解决方案:

  1. 由于并行创建 pandas df 很容易,因此不必从执行器返回它,而是使用 编写生成的 df df.to_parquet,即:
def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"
Run Code Online (Sandbox Code Playgroud)

之后 Spark 读取生成的镶木地板文件应该很简单。那么你的瓶颈就变成了 IO 限制,这应该比 Spark 转换元组/行类型更快。

  1. 如果您不允许将任何内容保存到文件中,pandas_udf并且GROUPED_MAP假设您的 Spark 版本足够新,则可能会帮助您。它也使用 pyarrow 在 Spark DF 和 pandas DF 之间进行转换,因此它应该比使用元组更快,并且允许您以分布式方式从 UDF 创建和返回 pandas DF。
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)


df.groupby("seed").apply(generate_data_udf).show()
Run Code Online (Sandbox Code Playgroud)

较慢的部分将是groupby您可能能够加速的部分,具体取决于您如何批量处理进入的种子generate_data_udf,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
Run Code Online (Sandbox Code Playgroud)