如何在Pyspark中使用Scala类

Alb*_*nto 19 python scala apache-spark apache-spark-sql pyspark

如果有任何方法可以使用Scala课程Pyspark,我一直在寻找一段时间,而且我没有找到任何关于这个主题的文档或指南.

假设我创建了一个简单的类,Scala它使用了一些库apache-spark,例如:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
Run Code Online (Sandbox Code Playgroud)
  • 有没有可能的方法来使用这个类Pyspark
  • 太难了吗?
  • 我必须创建一个.py文件吗?
  • 是否有任何指南说明如何做到这一点?

顺便说一句,我也查看了spark代码,感觉有点迷失,我无法为自己的目的复制它们的功能.

zer*_*323 26

是的,虽然可能远非微不足道,但它是可能的.通常,您需要一个Java(友好)包装器,因此您不必处理使用普通Java无法轻松表达的Scala功能,因此无法与Py4J网关很好地协作.

假设你的类是int的包com.example并且DataFrame调用了Pythondf

df = ... # Python DataFrame
Run Code Online (Sandbox Code Playgroud)

你必须:

  1. 使用您最喜欢的构建工具构建jar .

  2. 将它包含在驱动程序类路径中,例如使用--driver-class-pathPySpark shell /的参数spark-submit.根据确切的代码可能无法使用通过它--jars以及

  3. 从Python SparkContext实例中提取JVM 实例:

    jvm = sc._jvm
    
    Run Code Online (Sandbox Code Playgroud)
  4. SQLContextSQLContext实例中提取Scala :

    ssqlContext = sqlContext._ssql_ctx
    
    Run Code Online (Sandbox Code Playgroud)
  5. DataFrame从以下内容中提取Java df:

    jdf = df._jdf
    
    Run Code Online (Sandbox Code Playgroud)
  6. 创建新实例SimpleClass:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
    Run Code Online (Sandbox Code Playgroud)
  7. 调用exe方法并使用Python包装结果DataFrame:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    
    Run Code Online (Sandbox Code Playgroud)

结果应该是有效的PySpark DataFrame.您当然可以将所有步骤合并为一个呼叫.

重要提示:只有在Python代码仅在驱动程序上执行时,才可以使用此方法.它不能在Python动作或转换中使用.请参阅如何从操作或转换中使用Java/Scala函数?详情.


rwp*_*rwp 5

作为对@zero323答案的更新,鉴于 Spark 的 API 在过去六年中不断发展,适用于 Spark-3.2 的配方如下:

  1. 将 Scala 代码编译成 JAR 文件(例如使用sbt assembly
  2. 将 JAR 文件与本地包定义所需的任何参数一起包含在--jars参数中spark-submit--py-files
  3. 在 Python 中提取 JVM 实例:
jvm = spark._jvm
Run Code Online (Sandbox Code Playgroud)
  1. 提取 Java 表示形式SparkSession
jSess = spark._jsparkSession
Run Code Online (Sandbox Code Playgroud)
  1. DataFrame提取要传递到 Scala 方法中的PySpark“df”的 Java 句柄:
jdf = df._jdf
Run Code Online (Sandbox Code Playgroud)
  1. 从 PySpark 中创建一个新实例SimpleClass
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
Run Code Online (Sandbox Code Playgroud)
  1. 调用该exe方法并将其输出转换为 PySpark DataFrame
from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)
Run Code Online (Sandbox Code Playgroud)

如果您需要传递其他参数,例如 Python 字典,PySpark 可能会自动将它们转换为相应的Java类型,然后再出现在您的 Scala 方法中。Scala 提供了该JavaConverters包来帮助将其转换为更自然的 Scala 数据类型。例如,Python 字典可以传递到 Scala 方法中,并立即从 Java HashMap 转换为 Scala(可变)Map:

def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
  import scala.collection.JavaConverters._
  val params = jparams.asScala
  ...
}
Run Code Online (Sandbox Code Playgroud)