相关疑难解决方法(0)

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

python user-defined-functions apache-spark apache-spark-sql pyspark

27
推荐指数
4
解决办法
2万
查看次数

如何定义自定义聚合函数来对一列向量求和?

我有一个两列的DataFrame,ID类型IntVec类型Vector(org.apache.spark.mllib.linalg.Vector).

DataFrame如下所示:

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....
Run Code Online (Sandbox Code Playgroud)

我想groupBy($"ID")通过对向量求和来对每个组内的行应用聚合.

上述示例的所需输出将是:

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...
Run Code Online (Sandbox Code Playgroud)

可用的聚合函数将不起作用,例如,df.groupBy($"ID").agg(sum($"Vec")将导致ClassCastException.

如何实现自定义聚合函数,允许我进行向量或数组的总和或任何其他自定义操作?

scala aggregate-functions apache-spark apache-spark-sql apache-spark-ml

22
推荐指数
1
解决办法
1万
查看次数