我正在尝试创建一个用户定义的聚合函数,我可以从python中调用它.我试着按照这个问题的答案.我基本上实现了以下(取自这里):
package com.blu.bla;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.Row;
public class MySum extends UserDefinedAggregateFunction {
private StructType _inputDataType;
private StructType _bufferSchema;
private DataType _returnDataType;
public MySum() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true));
_inputDataType = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true));
_bufferSchema = DataTypes.createStructType(bufferFields);
_returnDataType = DataTypes.DoubleType;
}
@Override public StructType inputSchema() {
return _inputDataType;
}
@Override public StructType …Run Code Online (Sandbox Code Playgroud)