将自定义函数应用于spark数据帧组

gui*_*cgs 6 group-by dataset dataframe apache-spark pyspark

我有一个非常大的时间序列数据表,其中包含以下列:

  • 时间戳
  • LicensePlate
  • UberRide#
  • 速度

应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.

我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.

我需要做的是:

  1. 获取所有数据
  2. 按一些列分组
  3. Foreach spark数据帧组应用af(x).返回自定义对象foreach组
  4. 通过应用g(x)并返回单个自定义对象来获取结果

我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?

整个工作流程如下:

工作流程

zer*_*323 8

  • 虽然Spark提供了一些与Pandas集成的方法,但它并没有使Pandas分发.因此无论你在Spark中使用Pandas做什么都只是本地的(在转换中使用时是驱动程序或执行程序)操作.

    如果您正在寻找具有类似Pandas API的分布式系统,您应该看看dask.

  • 您可以定义用户定义聚合函数或Aggregators处理分组,Datasets但只能在Scala中直接访问API的这一部分.在创建Python包装器编写Python包装器并不难.
  • RDD API提供了大量的可用于在组执行操作开始低级别功能repartition/ repartitionAndSortWithinPartitions和与多个结束*byKey方法(combineByKey,groupByKey,reduceByKey等等).

    哪一个适用于您的情况取决于您要应用的函数的属性(它是关联的和可交换的,它可以在流上工作,它是否期望特定的顺序).

    最普遍但效率最低的方法可归纳如下:

    h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
    
    Run Code Online (Sandbox Code Playgroud)

    其中f从值映射为key,g对应于每个组的聚集和h是的最终合并.大多数时候你可以做得比这更好,所以它应该只作为最后的手段使用.

  • 可以使用DataFrames/ Spark SQL和窗口函数表示相对复杂的逻辑.

  • 另请参阅在PySpark中对GroupedData应用UDF(具有正常运行的python示例)


Flo*_*t F 7

自Spark 2.3:Pandas矢量化UDF以来,您所寻找的是什么.它允许对DataFrame进行分组并对大熊猫应用自定义转换,并在每个组上分布:

df.groupBy("groupColumn").apply(myCustomPandasTransformation)
Run Code Online (Sandbox Code Playgroud)

它非常易于使用,所以我只是链接到Databricks的熊猫UDF演示文稿.

但是,我不知道在Scala中进行分组转换的实用方法,所以欢迎任何其他建议.

编辑:在Scala中,你可以使用数据集的groupByKey+ mapGroups/ 来实现与早期版本的Spark相同的功能flatMapGroups.