使用带参数的 Grouped Map Pandas UDF

Yas*_*ne 19 python pyspark pandas-groupby

我想使用 data.groupby.apply() 将函数应用于每个组的 Pyspark Dataframe 的每一行。

我使用了 Grouped Map Pandas UDF。但是我不知道如何向我的函数添加另一个参数。

我尝试使用参数作为一个全局变量,但功能不recongnize它(我的观点是pyspark数据框),
我也试过在这一问题提出的解决方案(大熊猫据帧)使用熊猫GROUPBY()+应用()的参数

      @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
        def function(key,data, interval):
            interval_df=interval.filter(interval["var"]==key).toPandas()
            for value in interval_df:
                  #Apply some operations

        return Data.groupBy("msn").apply(calc_diff, ('arg1'))

Run Code Online (Sandbox Code Playgroud)

或者

 @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
        def function(key,data, interval):
            interval_df=interval.filter(interval["var"]==key).toPandas()
            for value in interval_df:
                  #Apply some operations

        return Data.groupBy("msn").apply(lambda x: calc_diff(x,'arg1'))
Run Code Online (Sandbox Code Playgroud)

但我收到错误:

ValueError: Invalid function: pandas_udfs with function type GROUPED_MAP 必须采用一个参数 (data) 或两个参数 (key, data)。

任何人都可以帮助我解决上述问题。

谢谢

小智 16

我喜欢@hwrd 的想法,但相反,它会使其成为一个生成器模式,以使其更容易集成,就像在@Feng 的示例中一样:

def function_generator(key):
    @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
    def function(interval):
        interval_df=interval.filter(interval["var"]==key).toPandas()
        for value in interval_df:
              #Apply some operations
    return function

calc_diff = function_generator('arg1')
output = Data.groupBy("msn").apply(calc_diff)

Run Code Online (Sandbox Code Playgroud)


小智 6

您可以在您的函数中创建 pandas udf,以便在创建时知道函数参数。(或者您可以导入 functools 并使用部分函数求值来做同样的事情。)这是PySpark 文档中的示例,修改为传入一些参数:

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


def my_function(df, by="id", column="v", value=1.0):
    schema = "{} long, {} double".format(by, column)

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def subtract_value(pdf):
        # pdf is a pandas.DataFrame
        v = pdf[column]
        g = pdf[by]
        return pdf.assign(v = v - g * value)

    return df.groupby(by).apply(subtract_value)

my_function(df, by="id", column="v", value=2.0).show()
Run Code Online (Sandbox Code Playgroud)


Fen*_*eng 2

我想你可以做这样的事情

def myfun(data, key, interval):
    #Apply some operations
    return something

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def myfun_udf(data):
    return myfun(data=data, key=mykey, interval=myinterval)


mykey=1
myinterval=2

Data.groupBy("msn").apply(myfun_udf)

Run Code Online (Sandbox Code Playgroud)