连接到火花中的mongodb时出现异常

dim*_*mak 5 hadoop exception mongodb hadoop-streaming apache-spark

在尝试使用MongoDB作为输入RDD时,我在org.bson.BasicBSONDecoder._decode中得到"java.lang.IllegalStateException:not ready":

Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");

JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);

System.out.println(rdd.count());
Run Code Online (Sandbox Code Playgroud)

我得到的例外是:14/08/06 09:49:57 INFO rdd.NewHadoopRDD:输入拆分:

MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException 
java.lang.IllegalStateException: not ready
            at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
            at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
            at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
            at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
            at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
            at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
            at java.lang.reflect.Method.invoke(Method.java:618)
            at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
            at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
            at java.lang.Thread.run(Thread.java:804)
Run Code Online (Sandbox Code Playgroud)

所有程序输出都在这里

环境:

  • 红帽
  • Spark 1.0.1
  • Hadoop 2.4.1
  • MongoDB 2.4.10
  • 蒙戈 - Hadoop的1.3

小智 5

我想我发现了这个问题:mongodb-hadoop在其core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java中的BSON编码器/解码器实例上有一个"静态"修饰符.当Spark以多线程模式运行时,所有线程都会尝试使用相同的编码器/解码器实例进行反序列化,这可能会导致结果不佳.

修补我的github 这里 (已提交上游拉入请求)

我现在能够从Python运行8核多线程Spark-> mongo集合计数()!

  • 看起来你的公关被接受了.https://github.com/mongodb/mongo-hadoop/releases mongo-hadoop驱动程序的1.3.1版本有更改.我有点困惑,因为看起来mongo-hadoop的maven repo组名已经改变了.新的dep应该是......````<dependency> <groupId> org.mongodb.mongo-hadoop </ groupId> <artifactId> mongo-hadoop-core </ artifactId> <version> 1.3.1 </ version> </ dependency>```使用这个版本已经为我停止了这个错误. (3认同)

jua*_*011 4

我发现了同样的问题。作为解决方法,我放弃了 newAPIHadoopRDD 方式,并实现了基于文档 id 上定义间隔的并行加载机制,然后并行加载每个分区。这个想法是使用 MongoDB Java 驱动程序实现以下 mongo shell 代码:

\n\n\n\n
// Compute min and max id of the collection\ndb.coll.find({},{_id:1}).sort({_id: 1}).limit(1)\n   .forEach(function(doc) {min_id = doc._id})\ndb.coll.find({},{_id:1}).sort({_id: -1}).limit(1)\n   .forEach(function(doc) {max_id = doc._id})\n\n// Compute id ranges\ncurr_id = min_id\nranges = []\npage_size = 1000\n// to avoid the use of Comparable in the Java translation\nwhile(! curr_id.equals(max_id)) {\n    prev_id = curr_id    \n    db.coll.find({_id : {$gte : curr_id}}, {_id : 1})\n           .sort({_id: 1})\n           .limit(page_size + 1)\n           .forEach(function(doc) {\n                       curr_id = doc._id\n                   })\n    ranges.push([prev_id, curr_id])\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在我们可以使用范围来对集合片段执行快速查询。请注意,需要对最后一个片段进行不同的处理,将其视为最小约束,以避免丢失集合的最后一个文档。

\n\n\n\n
db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}})\ndb.coll.find({_id : {$gte : ranges[2][0]}})\n
Run Code Online (Sandbox Code Playgroud)\n\n

我将其实现为简单 Range POJO 的 Java 方法“LinkedListcomputeIdRanges(DBCollection coll, int rangeSize)”,然后并行化该集合并使用 flatMapToPair 对其进行转换,以生成与 newAPIHadoopRDD 返回的 RDD 类似的 RDD。

\n\n\n\n
List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE);\nJavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size());\nJavaPairRDD<Object, BSONObject> mongoRDD = \n   parallelRanges.flatMapToPair(\n     new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() {\n       ...\n       BasicDBObject query = range.max.isPresent() ?\n           new BasicDBObject("_id", new BasicDBObject("$gte", range.min)\n                            .append("$lt", range.max.get()))\n         : new BasicDBObject("_id", new BasicDBObject("$gte", range.min));\n       ...\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以调整范围的大小和用于并行化的切片数量,以控制并行性的粒度。

\n\n

我希望这有帮助,

\n\n

问候!

\n\n

胡安·罗德\xc3\xadguez Hortal\xc3\xa1

\n