Yas*_*ne 19 python pyspark pandas-groupby
我想使用 data.groupby.apply() 将函数应用于每个组的 Pyspark Dataframe 的每一行。
我使用了 Grouped Map Pandas UDF。但是我不知道如何向我的函数添加另一个参数。
我尝试使用参数作为一个全局变量,但功能不recongnize它(我的观点是pyspark数据框),
我也试过在这一问题提出的解决方案(大熊猫据帧)使用熊猫GROUPBY()+应用()的参数
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(calc_diff, ('arg1'))
Run Code Online (Sandbox Code Playgroud)
或者
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(lambda x: calc_diff(x,'arg1'))
Run Code Online (Sandbox Code Playgroud)
但我收到错误:
ValueError: Invalid function: pandas_udfs with function type GROUPED_MAP 必须采用一个参数 (data) 或两个参数 (key, data)。
任何人都可以帮助我解决上述问题。
谢谢
小智 16
我喜欢@hwrd 的想法,但相反,它会使其成为一个生成器模式,以使其更容易集成,就像在@Feng 的示例中一样:
def function_generator(key):
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return function
calc_diff = function_generator('arg1')
output = Data.groupBy("msn").apply(calc_diff)
Run Code Online (Sandbox Code Playgroud)
小智 6
您可以在您的函数中创建 pandas udf,以便在创建时知道函数参数。(或者您可以导入 functools 并使用部分函数求值来做同样的事情。)这是PySpark 文档中的示例,修改为传入一些参数:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def my_function(df, by="id", column="v", value=1.0):
schema = "{} long, {} double".format(by, column)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def subtract_value(pdf):
# pdf is a pandas.DataFrame
v = pdf[column]
g = pdf[by]
return pdf.assign(v = v - g * value)
return df.groupby(by).apply(subtract_value)
my_function(df, by="id", column="v", value=2.0).show()
Run Code Online (Sandbox Code Playgroud)
我想你可以做这样的事情
def myfun(data, key, interval):
#Apply some operations
return something
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def myfun_udf(data):
return myfun(data=data, key=mykey, interval=myinterval)
mykey=1
myinterval=2
Data.groupBy("msn").apply(myfun_udf)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7161 次 |
| 最近记录: |