我试图弄清楚如何在 Python 中的类方法中使用selfinPandasUDF.GroupBy.Apply并在其中传递参数。我尝试了很多不同的方法但无法使其发挥作用。我还在互联网上广泛搜索了 PandasUDF 的示例,该示例在带有 self 和参数的类中使用,但找不到类似的内容。我知道如何用 来做所有前面提到的事情Pandas.GroupBy.Apply。
我可以让它工作的唯一方法是将其声明为静态方法
class Train:
return_type = StructType([
StructField("div_nbr", FloatType()),
StructField("store_nbr", FloatType()),
StructField("model_str", BinaryType())
])
function_type = PandasUDFType.GROUPED_MAP
def __init__(self):
............
def run_train(self):
output = sp_df.groupby(['A', 'B']).apply(self.model_train)
output.show(10)
@staticmethod
@pandas_udf(return_type, function_type)
def model_train(pd_df):
features_name = ['days_into_year', 'months_into_year', 'minutes_into_day', 'hour_of_day', 'recency']
X = pd_df[features_name].copy()
Y = pd.DataFrame(pd_df['trans_type_value']).copy()
estimator_1 = XGBRegressor(max_depth=3, learning_rate=0.1, n_estimators=300, verbosity=1,
objective='reg:squarederror', booster='gbtree', n_jobs=-1, gamma=0,
min_child_weight=5, max_delta_step=0, subsample=0.6, colsample_bytree=0.8,
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0, reg_lambda=1,
scale_pos_weight=1, base_score=0.5, random_state=1234, missing=None,
importance_type='gain')
estimator_1.fit(X, Y)
df_to_return = pd_df[['div_nbr', 'store_nbr']].drop_duplicates().copy()
df_to_return['model_str'] = pickle.dumps(estimator_1)
return df_to_return
Run Code Online (Sandbox Code Playgroud)
我实际上想实现的是声明return_typeand function_type, features_namein __init__(),然后在 PandasUDF 中使用它,同时传递参数以在执行时在函数内部使用PandasUDF.GroupBy.Apply
如果有人可以帮助我,我将非常感激。我是 PySpark 的新手。
Pandas UDF 生命周期:
问题:
当处理额外的数据(例如类的 )时self,pandas udf 仍然需要序列化并发送该数据。序列化复杂的 python 对象(如类)不属于PyArrow 的功能,因此您必须创建一个包装函数并仅引用 pandas_udf 中的特定可序列化 python 类型,或者 2)使用 a@staticmethod来消除对self.
1 -在类中带有参数的Pandas UDF:用函数包装方法并在该包装器src中创建局部变量。请注意,pandas_udf 中引用的所有变量都必须受 PyArrow 支持。支持大多数 python 类型,但不支持类。
class my_class:
def __init__(self, s):
self.s = s
def wrapper_add_s(self, column):
local_s = self.s # create a local copy of s to be referenced by udf
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + f'_{local_s}'
return add_s(column)
def add_col(df):
return df.withColumn("Name", self.wrapper_add_s("Name"))
c = my_class(s='hi')
c.add_col(df)
Run Code Online (Sandbox Code Playgroud)
2 -类中没有参数的 Pandas UDF :使用@staticmethod装饰器
class my_class:
def __init__(self, s):
pass
@staticmethod
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + ' static string'
def add_col(df):
return df.withColumn("Name", self.wrapper_add_s("Name"))
c = my_class()
c.add_col(df)
Run Code Online (Sandbox Code Playgroud)
如果您正在寻找一个简单的结构来将参数传递给类外部的 pandas_udf,请使用这个... - src
def wrapper_add_s(column, s):
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + f'_{s}'
return add_s(column)
df = df.withColumn("Name", wrapper_add_s("Name", s='hi'))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2074 次 |
| 最近记录: |