Alb*_*nto 19 python scala apache-spark apache-spark-sql pyspark
如果有任何方法可以使用Scala课程Pyspark,我一直在寻找一段时间,而且我没有找到任何关于这个主题的文档或指南.
假设我创建了一个简单的类,Scala它使用了一些库apache-spark,例如:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
Run Code Online (Sandbox Code Playgroud)
Pyspark?.py文件吗? 顺便说一句,我也查看了spark代码,感觉有点迷失,我无法为自己的目的复制它们的功能.
zer*_*323 26
是的,虽然可能远非微不足道,但它是可能的.通常,您需要一个Java(友好)包装器,因此您不必处理使用普通Java无法轻松表达的Scala功能,因此无法与Py4J网关很好地协作.
假设你的类是int的包com.example并且DataFrame调用了Pythondf
df = ... # Python DataFrame
Run Code Online (Sandbox Code Playgroud)
你必须:
使用您最喜欢的构建工具构建jar .
将它包含在驱动程序类路径中,例如使用--driver-class-pathPySpark shell /的参数spark-submit.根据确切的代码可能无法使用通过它--jars以及
从Python SparkContext实例中提取JVM 实例:
jvm = sc._jvm
Run Code Online (Sandbox Code Playgroud)SQLContext从SQLContext实例中提取Scala :
ssqlContext = sqlContext._ssql_ctx
Run Code Online (Sandbox Code Playgroud)DataFrame从以下内容中提取Java df:
jdf = df._jdf
Run Code Online (Sandbox Code Playgroud)创建新实例SimpleClass:
simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
Run Code Online (Sandbox Code Playgroud)调用exe方法并使用Python包装结果DataFrame:
from pyspark.sql import DataFrame
DataFrame(simpleObject.exe(), ssqlContext)
Run Code Online (Sandbox Code Playgroud)结果应该是有效的PySpark DataFrame.您当然可以将所有步骤合并为一个呼叫.
重要提示:只有在Python代码仅在驱动程序上执行时,才可以使用此方法.它不能在Python动作或转换中使用.请参阅如何从操作或转换中使用Java/Scala函数?详情.
作为对@zero323答案的更新,鉴于 Spark 的 API 在过去六年中不断发展,适用于 Spark-3.2 的配方如下:
sbt assembly)--jars参数中spark-submit--py-filesjvm = spark._jvm
Run Code Online (Sandbox Code Playgroud)
SparkSession:jSess = spark._jsparkSession
Run Code Online (Sandbox Code Playgroud)
DataFrame提取要传递到 Scala 方法中的PySpark“df”的 Java 句柄:jdf = df._jdf
Run Code Online (Sandbox Code Playgroud)
SimpleClass:simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
Run Code Online (Sandbox Code Playgroud)
exe方法并将其输出转换为 PySpark DataFrame:from pyspark.sql import DataFrame
result = DataFrame(simpleObject.exe(), spark)
Run Code Online (Sandbox Code Playgroud)
如果您需要传递其他参数,例如 Python 字典,PySpark 可能会自动将它们转换为相应的Java类型,然后再出现在您的 Scala 方法中。Scala 提供了该JavaConverters包来帮助将其转换为更自然的 Scala 数据类型。例如,Python 字典可以传递到 Scala 方法中,并立即从 Java HashMap 转换为 Scala(可变)Map:
def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
import scala.collection.JavaConverters._
val params = jparams.asScala
...
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8355 次 |
| 最近记录: |