相关疑难解决方法(0)

Spark:如何使用Scala或Java用户定义函数映射Python?

比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.

难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?

python java scala apache-spark pyspark

21
推荐指数
1
解决办法
1万
查看次数

实现java UDF并从pyspark调用它

我需要创建一个在pyspark python中使用的UDF,它使用java对象进行内部计算.

如果它是一个简单的python,我会做类似的事情:

def f(x):
    return 7
fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType())
Run Code Online (Sandbox Code Playgroud)

并使用以下方式调用:

df = sqlContext.range(0,5)
df2 = df.withColumn("a",fudf(df.id)).show()
Run Code Online (Sandbox Code Playgroud)

但是,我需要的函数的实现是在java而不是在python中.我需要以某种方式包装它,所以我可以从python中以类似的方式调用它.

我的第一个尝试是实现java对象,然后将其包装在pyspark中的python中并将其转换为UDF.因序列化错误而失败.

Java代码:

package com.test1.test2;

public class TestClass1 {
    Integer internalVal;
    public TestClass1(Integer val1) {
        internalVal = val1;
    }
    public Integer do_something(Integer val) {
        return internalVal;
    }    
}
Run Code Online (Sandbox Code Playgroud)

pyspark代码:

from py4j.java_gateway import java_import
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
java_import(sc._gateway.jvm, "com.test1.test2.TestClass1")
a = sc._gateway.jvm.com.test1.test2.TestClass1(7)
audf = udf(a,IntegerType())
Run Code Online (Sandbox Code Playgroud)

错误:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-2-9756772ab14f> in <module>()
      4 java_import(sc._gateway.jvm, "com.test1.test2.TestClass1") …
Run Code Online (Sandbox Code Playgroud)

python java py4j apache-spark pyspark

9
推荐指数
1
解决办法
3598
查看次数

在PySpark中运行自定义Java类

我正在尝试在PySpark中运行自定义HDFS阅读器类。这个类是用Java编写的,我需要从PySpark或从shell或通过spark-submit访问它。

在PySpark中,我从SparkContext(sc._gateway)中检索JavaGateway 。

说我有一堂课:

package org.foo.module

public class Foo {

    public int fooMethod() {
        return 1;
    }

}
Run Code Online (Sandbox Code Playgroud)

我试图将其打包到一个jar中,并将其与--jar选项一起传递给pyspark,然后运行:

from py4j.java_gateway import java_import

jvm = sc._gateway.jvm
java_import(jvm, "org.foo.module.*")

foo = jvm.org.foo.module.Foo()
Run Code Online (Sandbox Code Playgroud)

但是我得到了错误:

Py4JError: Trying to call a package.
Run Code Online (Sandbox Code Playgroud)

有人可以帮忙吗?谢谢。

python java py4j apache-spark pyspark

5
推荐指数
2
解决办法
6687
查看次数

标签 统计

apache-spark ×3

java ×3

pyspark ×3

python ×3

py4j ×2

scala ×1