比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.
难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?
我需要创建一个在pyspark python中使用的UDF,它使用java对象进行内部计算.
如果它是一个简单的python,我会做类似的事情:
def f(x):
return 7
fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType())
Run Code Online (Sandbox Code Playgroud)
并使用以下方式调用:
df = sqlContext.range(0,5)
df2 = df.withColumn("a",fudf(df.id)).show()
Run Code Online (Sandbox Code Playgroud)
但是,我需要的函数的实现是在java而不是在python中.我需要以某种方式包装它,所以我可以从python中以类似的方式调用它.
我的第一个尝试是实现java对象,然后将其包装在pyspark中的python中并将其转换为UDF.因序列化错误而失败.
Java代码:
package com.test1.test2;
public class TestClass1 {
Integer internalVal;
public TestClass1(Integer val1) {
internalVal = val1;
}
public Integer do_something(Integer val) {
return internalVal;
}
}
Run Code Online (Sandbox Code Playgroud)
pyspark代码:
from py4j.java_gateway import java_import
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
java_import(sc._gateway.jvm, "com.test1.test2.TestClass1")
a = sc._gateway.jvm.com.test1.test2.TestClass1(7)
audf = udf(a,IntegerType())
Run Code Online (Sandbox Code Playgroud)
错误:
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-2-9756772ab14f> in <module>()
4 java_import(sc._gateway.jvm, "com.test1.test2.TestClass1") …Run Code Online (Sandbox Code Playgroud) 我正在尝试在PySpark中运行自定义HDFS阅读器类。这个类是用Java编写的,我需要从PySpark或从shell或通过spark-submit访问它。
在PySpark中,我从SparkContext(sc._gateway)中检索JavaGateway 。
说我有一堂课:
package org.foo.module
public class Foo {
public int fooMethod() {
return 1;
}
}
Run Code Online (Sandbox Code Playgroud)
我试图将其打包到一个jar中,并将其与--jar选项一起传递给pyspark,然后运行:
from py4j.java_gateway import java_import
jvm = sc._gateway.jvm
java_import(jvm, "org.foo.module.*")
foo = jvm.org.foo.module.Foo()
Run Code Online (Sandbox Code Playgroud)
但是我得到了错误:
Py4JError: Trying to call a package.
Run Code Online (Sandbox Code Playgroud)
有人可以帮忙吗?谢谢。