use*_*226 10 serialization elasticsearch apache-spark elasticsearch-hadoop
我正在使用elasticsearch中加载的一些测试数据在本地计算机上测试ElasticSearch和Spark集成.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Run Code Online (Sandbox Code Playgroud)
代码运行正常并使用esRDD.first()成功返回正确的结果
但是,esRDD.collect()将生成异常:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
我应该做些什么来让它发挥作用吗?谢谢
更新:序列化设置问题已解决.通过使用
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
代替
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
现在还有另一个此数据集中有1000条不同的记录
esRDD.count()
Run Code Online (Sandbox Code Playgroud)
但是,返回1000没问题
esRDD.distinct().count()
Run Code Online (Sandbox Code Playgroud)
返回5!如果我打印记录
esRDD.foreach(println)
Run Code Online (Sandbox Code Playgroud)
它正确打印出1000条记录.但如果我使用收集或采取
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)
它将打印DUPLICATED记录,并且确实只显示了5个UNIQUE记录,这似乎是整个数据集的随机子集 - 它不是前5个记录.如果我保存RDD并将其读回
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)
esRDD2表现如预期.我想知道是否有一个bug,或者我不了解collect/take的行为.或者是因为我在本地运行所有东西.默认情况下,Spark RDD似乎使用5个分区,如"spark-output"文件的part-xxxx文件数所示.这可能就是为什么esRDD.collect()和esRDD.distinct()返回5个唯一记录,而不是其他一些随机数.但那仍然不对.
您应该使用以下代码来初始化:
val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3645 次 |
| 最近记录: |