Ass*_*son 6 python java apache-spark pyspark
我正在尝试创建一个用户定义的聚合函数,我可以从python中调用它.我试着按照这个问题的答案.我基本上实现了以下(取自这里):
package com.blu.bla;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.Row;
public class MySum extends UserDefinedAggregateFunction {
private StructType _inputDataType;
private StructType _bufferSchema;
private DataType _returnDataType;
public MySum() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true));
_inputDataType = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true));
_bufferSchema = DataTypes.createStructType(bufferFields);
_returnDataType = DataTypes.DoubleType;
}
@Override public StructType inputSchema() {
return _inputDataType;
}
@Override public StructType bufferSchema() {
return _bufferSchema;
}
@Override public DataType dataType() {
return _returnDataType;
}
@Override public boolean deterministic() {
return true;
}
@Override public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, null);
}
@Override public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
if (buffer.isNullAt(0)) {
buffer.update(0, input.getDouble(0));
} else {
Double newValue = input.getDouble(0) + buffer.getDouble(0);
buffer.update(0, newValue);
}
}
}
@Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
if (!buffer2.isNullAt(0)) {
if (buffer1.isNullAt(0)) {
buffer1.update(0, buffer2.getDouble(0));
} else {
Double newValue = buffer2.getDouble(0) + buffer1.getDouble(0);
buffer1.update(0, newValue);
}
}
}
@Override public Object evaluate(Row buffer) {
if (buffer.isNullAt(0)) {
return null;
} else {
return buffer.getDouble(0);
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后我用所有依赖项编译它并使用--jars myjar.jar运行pyspark
在pyspark我做了:
df = sqlCtx.createDataFrame([(1.0, "a"), (2.0, "b"), (3.0, "C")], ["A", "B"])
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
def myCol(col):
_f = sc._jvm.com.blu.bla.MySum.apply
return Column(_f(_to_seq(sc,[col], _to_java_column)))
b = df.agg(myCol("A"))
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-24-f45b2a367e67> in <module>()
----> 1 b = df.agg(myCol("A"))
<ipython-input-22-afcb8884e1db> in myCol(col)
4 def myCol(col):
5 _f = sc._jvm.com.blu.bla.MySum.apply
----> 6 return Column(_f(_to_seq(sc,[col], _to_java_column)))
TypeError: 'JavaPackage' object is not callable
Run Code Online (Sandbox Code Playgroud)
我也尝试在pyspark调用中添加--driver-class-path但得到了相同的结果.
还尝试通过java import访问java类:
from py4j.java_gateway import java_import
jvm = sc._gateway.jvm
java_import(jvm, "com.bla.blu.MySum")
def myCol2(col):
_f = jvm.bla.blu.MySum.apply
return Column(_f(_to_seq(sc,[col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
还尝试简单地创建类(如此处所示):
a = jvm.com.bla.blu.MySum()
Run Code Online (Sandbox Code Playgroud)
所有人都收到相同的错误消息.
我似乎无法弄清问题是什么.
所以看来主要问题是,如果给出相对路径,则添加 jar 的所有选项(--jars、驱动程序类路径、SPARK_CLASSPATH)都无法正常工作。这可能是因为 ipython 内部的工作目录存在问题,而不是我运行 pyspark 的地方。
一旦我将其更改为绝对路径,它就可以工作(尚未在集群上测试它,但至少它可以在本地安装上工作)。
另外,我不确定这是否也是这里答案中的一个错误,因为该答案使用了 scala 实现,但是在 java 实现中我需要做
def myCol(col):
_f = sc._jvm.com.blu.bla.MySum().apply
return Column(_f(_to_seq(sc,[col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
这可能不是很有效,因为它每次都会创建 _f,相反,我应该在函数外部定义 _f (同样,这需要在集群上进行测试),但至少现在它提供了正确的功能答案
| 归档时间: |
|
| 查看次数: |
5794 次 |
| 最近记录: |