Bom*_*min 1 serialization scala apache-spark
我的测试代码非常简单,并且几乎从spark示例中复制了,但是,
import org.apache.spark.sql.SparkSession
import scala.util.Properties
class MyTest(sparkSession: SparkSession, properties: java.util.Properties) {
val spark: SparkSession = sparkSession
val sparkHome = Properties.envOrElse("SPARK_HOME", "/spark")
val props = properties
def run(): Unit = {
val logFile = sparkHome + "/README.md"
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains(props.get("v1"))).count()
val numBs = logData.filter(line => line.contains(props.get("v2"))).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
}
}
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试运行它时,它总是报告Exception in thread "main" org.apache.spark.SparkException: Task not serializable并指向行val numAs = logData.filter(line => line.contains(props.get("v1"))).count()
好吧,我改成之后
val v1 = props.get("v1")
val v2 = props.get("v2")
val numAs = logData.filter(line => line.contains(v1)).count()
val numBs = logData.filter(line => line.contains(v2)).count()
Run Code Online (Sandbox Code Playgroud)
例外消失了.我认为原因是火花抱怨props无法序列化.但是,java.util.Properties实际上实现了java.io.Serializable
class Properties extends Hashtable<Object,Object> {
Run Code Online (Sandbox Code Playgroud)
和哈希表
public class Hashtable<K,V>
extends Dictionary<K,V>
implements Map<K,V>, Cloneable, java.io.Serializable {
Run Code Online (Sandbox Code Playgroud)
为什么我仍然得到这个例外?
我需要这样做的原因是因为我的spark作业有一些命令行参数,需要将它们传递给我的spark作业类实例.我这样做的最佳做法是什么?
这条线
line => line.contains(props.get("v1"))
Run Code Online (Sandbox Code Playgroud)
隐式捕获this,MyTest因为它与以下内容相同:
line => line.contains(this.props.get("v1"))
Run Code Online (Sandbox Code Playgroud)
并且MyTest不可序列化.
定义val props = properties内部run()方法,而不是在类体中.
| 归档时间: |
|
| 查看次数: |
811 次 |
| 最近记录: |