在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

aro*_*r09 27 python user-defined-functions apache-spark apache-spark-sql pyspark

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

Rya*_*ier 39

您尝试的是编写UDAF(用户定义聚合函数)而不是UDF(用户定义函数).UDAF是处理按密钥分组的数据的函数.具体来说,他们需要定义如何在单个分区中合并组中的多个值,然后如何跨分区合并键的结果.目前在python中没有办法实现UDAF,它们只能在Scala中实现.

但是,您可以在Python中解决它.您可以使用收集集来收集分组值,然后使用常规UDF来执行您想要的操作.唯一需要注意的是collect_set仅适用于原始值,因此您需要将它们编码为字符串.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))
Run Code Online (Sandbox Code Playgroud)

如果要进行重复数据删除,请使用collect_set.此外,如果您的某些密钥有很多值,这将会很慢,因为密钥的所有值都需要在集群中的某个分区中收集.如果你的最终结果是你通过以某种方式组合每个键的值来构建的值(例如将它们相加),那么使用RDD aggregateByKey方法实现它可能会更快,它允许你为分区中的每个键构建一个中间值.改组数据.

编辑:11/21/2018

由于这个答案是写的,pyspark使用Pandas增加了对UDAF的支持.使用Panda的UDF和UDAF比使用RDD的直接python函数有一些不错的性能改进.在引擎盖下,它会对列进行矢量化(将多行中的值批处理在一起以优化处理和压缩).请查看此处以获得更好的解释,或者查看user6910411的答案以获取示例.


use*_*411 36

从Spark 2.3开始就可以使用了pandas_udf.GROUPED_MAPCallable[[pandas.DataFrame], pandas.DataFrame]或者换句话说,从熊猫映射的函数DataFrame相同的形状作为输入的,到输出DataFrame.

例如,如果数据如下所示:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)
Run Code Online (Sandbox Code Playgroud)

并且您想要计算成对min之间的平均值value1 value2,您必须定义输出模式:

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])
Run Code Online (Sandbox Code Playgroud)

pandas_udf:

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result
Run Code Online (Sandbox Code Playgroud)

并应用它:

df.groupby("key").apply(g).show()
Run Code Online (Sandbox Code Playgroud)
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+
Run Code Online (Sandbox Code Playgroud)

排除模式定义和装饰器,您可以按原样应用当前的Pandas代码.

由于火花2.4.0也有GROUPED_AGG变型,这需要Callable[[pandas.Series, ...], T],其中T是一个原始标量:

import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.minimum(x, y).mean()
Run Code Online (Sandbox Code Playgroud)

可与标准group_by/ agg构造一起使用:

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
Run Code Online (Sandbox Code Playgroud)
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+
Run Code Online (Sandbox Code Playgroud)

请注意,既不GROUPED_MAP也不GROUPPED_AGG pandas_udf具有相同的行为方式UserDefinedAggregateFunction或者Aggregator,它是更接近groupByKey或窗口功能与无限的框架.首先对数据进行混洗,然后仅应用UDF.

为了优化执行,您应该实现ScalaUserDefinedAggregateFunction添加Python包装器.

另请参阅PySpark中应用于Window的用户定义函数?

  • 这不行。我有 Spark 2.4,并且收到“作业因阶段失败而中止” (3认同)

Jan*_*azz 8

PySpark 3.0.0 版本中的另一个扩展: applyInPandas

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], 
                            ("id", "v"))  

def mean_func(key, pdf):
   # key is a tuple of one numpy.int64, which is the value
   # of 'id' for the current group
   return pd.DataFrame([key + (pdf.v.mean(),)])

df.groupby('id').applyInPandas(mean_func, schema="id long, v double").show() 
Run Code Online (Sandbox Code Playgroud)

结果是:

+---+---+
| id|  v|
+---+---+
|  1|1.5|
|  2|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)

有关更多详细信息,请参阅:https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html


May*_*gar 7

我将超越答案。

因此,您可以使用@pandas_udf在pyspark中实现类似pandas.groupby()。apply的逻辑,这是矢量化方法,并且比简单的udf更快。

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()
Run Code Online (Sandbox Code Playgroud)

您将得到以下结果:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

因此,您可以在分组数据中的其他字段之间进行更多计算,并将它们以列表格式添加到数据框中。