序列化和自定义Spark RDD类

llo*_*ett 1 serialization hadoop scala apache-spark rdd

我在Scala中编写了一个自定义的Spark RDD实现,我正在使用Spark shell调试我的实现.我现在的目标是获得:

customRDD.count
Run Code Online (Sandbox Code Playgroud)

没有例外就能成功.现在这就是我得到的:

15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)

...

Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
    at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
    ... 45 more
Run Code Online (Sandbox Code Playgroud)

"未能序列化任务0"引起了我的注意.我没有一个关于我正在发生的事情的精彩图片customRDD.count,而且还不清楚究竟什么是无法序列化的.

我的自定义RDD包括:

  • 自定义RDD类
  • 自定义分区类
  • 自定义(scala)迭代器类

我的Spark shell会话看起来像这样:

import custom.rdd.stuff
import org.apache.spark.SparkContext

val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count

... (exception output) ...
Run Code Online (Sandbox Code Playgroud)

我想知道的是:

  • 为了创建自定义RDD类,需要什么"可序列化"?
  • 就Spark而言,"可序列化"是什么意思?这类似于Java的"Serializable"吗?
  • 从RDD的迭代器返回的所有数据(由compute方法返回)是否也需要可序列化?

非常感谢您对此问题的任何澄清.

Ken*_*ani 6

在Spark上下文上执行的代码需要存在于指示执行任务的工作节点的同一进程边界内.这意味着必须注意确保RDD自定义中引用的任何对象或值都是可序列化的.如果对象是不可序列化的,那么您需要确保它们的范围正确,以便每个分区都有该对象的新实例.

基本上,您不能共享Spark驱动程序上声明的对象的非可序列化实例,并期望将其状态复制到群集上的其他节点.

这是一个无法序列化非可序列化对象的示例:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();
Run Code Online (Sandbox Code Playgroud)

下面的示例可以正常工作,因为它位于lambda的上下文中,它可以正确地分发到多个分区,而无需序列化非可序列化对象的实例的状态.这也适用于作为RDD定制(如果有)的一部分引用的非可序列化的传递依赖项.

rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});
Run Code Online (Sandbox Code Playgroud)

有关更多详细信息,请参见此处:http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html


Dan*_*bos 5

除了肯尼的解释之外,我建议您打开序列化调试以查看导致问题的原因。通常,仅仅通过查看代码是人类不可能弄清楚的。

-Dsun.io.serialization.extendedDebugInfo=true
Run Code Online (Sandbox Code Playgroud)