我正在使用elasticsearch中加载的一些测试数据在本地计算机上测试ElasticSearch和Spark集成.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Run Code Online (Sandbox Code Playgroud)
代码运行正常并使用esRDD.first()成功返回正确的结果
但是,esRDD.collect()将生成异常:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
我应该做些什么来让它发挥作用吗?谢谢
更新:序列化设置问题已解决.通过使用
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
代替
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
现在还有另一个此数据集中有1000条不同的记录
esRDD.count()
Run Code Online (Sandbox Code Playgroud)
但是,返回1000没问题 …
serialization elasticsearch apache-spark elasticsearch-hadoop