小编Dav*_*ave的帖子

没有 GroupBy 的 Pyspark SQL Pandas 分组地图?

我有一个数据集,我想在 AWS EMR 中的临时集群上运行的更大 ETL 过程的不同阶段使用多个 Pyspark SQL Grouped Map UDF进行映射。Grouped Map API 要求在应用之前对 Pyspark 数据帧进行分组,但我实际上不需要对键进行分组。

目前,我正在使用任意分组,该分组有效,但导致:

  1. 不必要的洗牌。

  2. 每个作业中任意 groupby 的 hacky 代码。

我的理想解决方案允许在没有任意分组的情况下应用矢量化 Pandas UDF,但是如果我可以保存至少可以消除无序的任意分组。

编辑

这是我的代码的样子。我最初使用的是任意分组,但目前正在spark_partition_id()根据@pault 的评论进行尝试。


@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid", axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid", spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

Run Code Online (Sandbox Code Playgroud)

使用spark_partition_id()似乎仍然会导致洗牌。我得到以下 DAG:

阶段1

  1. 扫描拼花
  2. 项目
  3. 项目
  4. 交换

第二阶段

  1. 交换
  2. 种类
  3. FlatMapGroupsInPandas

python pandas apache-spark pyspark pyspark-sql

9
推荐指数
1
解决办法
880
查看次数

标签 统计

apache-spark ×1

pandas ×1

pyspark ×1

pyspark-sql ×1

python ×1