小编dyl*_*ini的帖子

使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

我想通过Spark in Java更新MongoDb中的特定集合.我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息并将其保存.

在关注Sampo Niskanen关于通过Spark检索和保存集合到MongoDb的优秀帖子之后,我对更新集合感到困惑.

MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,可以在现有集合上进行比较并执行更新.但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class,我想知道如何使用该更新构造函数.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
Run Code Online (Sandbox Code Playgroud)

在此之前,MongoUpdateWritable.java用于执行集合更新.从我在Hadoop上看到的例子来看,这通常是设置的mongo.job.output.value,在Spark中可能是这样的:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);
Run Code Online (Sandbox Code Playgroud)

但是,我仍然想知道如何指定更新密钥MongoUpdateWritable.java.

不可否认,作为一种hacky方式,我将对象的"_id"设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖具有相同KeyValue的文档_id.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key …
Run Code Online (Sandbox Code Playgroud)

java mongodb apache-spark rdd

4
推荐指数
1
解决办法
3283
查看次数

标签 统计

apache-spark ×1

java ×1

mongodb ×1

rdd ×1