如何在课堂上使用 Pandas UDF

dev*_*tel 8 pandas pyspark

我试图弄清楚如何在 Python 中的类方法中使用selfinPandasUDF.GroupBy.Apply并在其中传递参数。我尝试了很多不同的方法但无法使其发挥作用。我还在互联网上广泛搜索了 PandasUDF 的示例,该示例在带有 self 和参数的类中使用,但找不到类似的内容。我知道如何用 来做所有前面提到的事情Pandas.GroupBy.Apply

我可以让它工作的唯一方法是将其声明为静态方法

class Train:
    return_type = StructType([
        StructField("div_nbr", FloatType()),
        StructField("store_nbr", FloatType()),
        StructField("model_str", BinaryType())
    ])
    function_type = PandasUDFType.GROUPED_MAP

    def __init__(self):
       ............

    def run_train(self):
         output = sp_df.groupby(['A', 'B']).apply(self.model_train)
         output.show(10)

    @staticmethod
    @pandas_udf(return_type, function_type)
    def model_train(pd_df):
        features_name = ['days_into_year', 'months_into_year', 'minutes_into_day', 'hour_of_day', 'recency']

        X = pd_df[features_name].copy()
        Y = pd.DataFrame(pd_df['trans_type_value']).copy()

        estimator_1 = XGBRegressor(max_depth=3, learning_rate=0.1, n_estimators=300, verbosity=1,
                                   objective='reg:squarederror', booster='gbtree', n_jobs=-1, gamma=0,
                                   min_child_weight=5, max_delta_step=0, subsample=0.6, colsample_bytree=0.8,
                                   colsample_bylevel=1, colsample_bynode=1, reg_alpha=0, reg_lambda=1,
                                   scale_pos_weight=1, base_score=0.5, random_state=1234, missing=None,
                                   importance_type='gain')
        estimator_1.fit(X, Y)
        df_to_return = pd_df[['div_nbr', 'store_nbr']].drop_duplicates().copy()
        df_to_return['model_str'] = pickle.dumps(estimator_1)

        return df_to_return
Run Code Online (Sandbox Code Playgroud)

我实际上想实现的是声明return_typeand function_type, features_namein __init__(),然后在 PandasUDF 中使用它,同时传递参数以在执行时在函数内部使用PandasUDF.GroupBy.Apply

如果有人可以帮助我,我将非常感激。我是 PySpark 的新手。

Mic*_*erk 5

背景

Pandas UDF 生命周期:

  1. Spark 数据类型(Column、DataFrame)通过 PyArrow 序列化为 Arrow 的表格式
  2. 该数据被发送到 python 虚拟环境 (VM),该虚拟环境是在每个执行器内 JIT 创建的。
  3. 在到达 python VM 之前,数据被反序列化为 pandas Column/DataFrame,并且 pandas_udf 代码在该 Column/DataFrame 上运行。
  4. Pandas 输出被序列化回 Arrow 的表格格式。
  5. python VM 将数据发送回调用进程。
  6. 在到达 Spark 环境之前,Arrow Table 被解码回 Spark 数据类型。

问题:

当处理额外的数据(例如类的 )时self,pandas udf 仍然需要序列化并发送该数据。序列化复杂的 python 对象(如类)不属于PyArrow 的功能,因此您必须创建一个包装函数并仅引用 pandas_udf 中的特定可序列化 python 类型,或者 2)使用 a@staticmethod来消除对self.

解决方案

1 -在类中带有参数的Pandas UDF:用函数包装方法并在该包装器src中创建局部变量。请注意,pandas_udf 中引用的所有变量都必须受 PyArrow 支持。支持大多数 python 类型,但不支持类。

class my_class:
  def __init__(self, s):
    self.s = s
  
  def wrapper_add_s(self, column):
    local_s = self.s # create a local copy of s to be referenced by udf

    @pandas_udf("string")
    def add_s(column: pd.Series) -> pd.Series:
      return column + f'_{local_s}'

    return add_s(column)


  def add_col(df):
    return df.withColumn("Name", self.wrapper_add_s("Name"))

c = my_class(s='hi')
c.add_col(df)
Run Code Online (Sandbox Code Playgroud)

2 -类中没有参数的 Pandas UDF :使用@staticmethod装饰器

class my_class:
  def __init__(self, s):
    pass

  @staticmethod
  @pandas_udf("string")
  def add_s(column: pd.Series) -> pd.Series:
    return column + ' static string'


  def add_col(df):
    return df.withColumn("Name", self.wrapper_add_s("Name"))

c = my_class()
c.add_col(df)

Run Code Online (Sandbox Code Playgroud)

不在一个班级

如果您正在寻找一个简单的结构来将参数传递给类外部的 pandas_udf,请使用这个... - src

def wrapper_add_s(column, s):

  @pandas_udf("string")
  def add_s(column: pd.Series) -> pd.Series:
    return column + f'_{s}'

  return add_s(column)


df = df.withColumn("Name", wrapper_add_s("Name", s='hi'))
Run Code Online (Sandbox Code Playgroud)