Tao*_*iao 7 serialization kryo apache-spark
我想将一个对象从驱动程序节点传递到RDD所在的其他节点,以便RDD的每个分区都可以访问该对象,如下面的代码片段所示.
object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码片段中,我尝试在Spark中通过Kryo 序列化ImmutableBytesWritable,所以我做了以下操作:
但是,当我在yarn-client模式下提交我的Spark应用程序时,抛出以下异常:
线程"main"org.apache.spark.SparkException中的异常:org.apache.spark.uxt.ClosureCleaner $. (ClosureCleaner.scala:158)org.apache.spark.SparkContext.clean(SparkContext.scala:1242)atg.apache.spark.rdd.RDD.map(RDD.scala:270)at xt.HelloSpark $ .main (HelloSpark.scala:23)at.HelloSpark.main(HelloSpark.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.在org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:325)的java.lang.reflect.Method.invoke(Method.java:606)中委托MethodAethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43). org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)中的apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:75)引起:java.io.NotSerializableException:org.apache.hadoop.hbase .io.Immutab java.io.ObjectOutputStream.drite上的java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)中的java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)中的leBytesWritable,java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)位于org.apache.spark.serializer的java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)的java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)中的.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431). org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)中的JavaSerializationStream.writeObject(JavaSerializer.scala:42)位于org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:164). ..还有12个
似乎ImmutableBytesWritable不能被Kryo序列化.那么让Spark使用Kryo序列化对象的正确方法是什么?Kryo可以序列化任何类型吗?
发生这种情况是因为您ImmutableBytesWritable
在闭包中使用了。Spark 尚不支持 Kryo 的闭包序列化(仅支持 RDD 中的对象)。您可以借助它来解决您的问题:
Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?
您只需在通过闭包之前序列化对象,然后再反序列化即可。即使您的类不可序列化,这种方法也能发挥作用,因为它在幕后使用 Kryo。你所需要的只是一些咖喱。;)
这是一个示例草图:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _
rdd.flatMap(mapper).collectAsMap()
object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3204 次 |
最近记录: |