我想通过Spark in Java更新MongoDb中的特定集合.我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息并将其保存.
在关注Sampo Niskanen关于通过Spark检索和保存集合到MongoDb的优秀帖子之后,我对更新集合感到困惑.
MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,可以在现有集合上进行比较并执行更新.但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class,我想知道如何使用该更新构造函数.
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
Run Code Online (Sandbox Code Playgroud)
在此之前,MongoUpdateWritable.java用于执行集合更新.从我在Hadoop上看到的例子来看,这通常是设置的mongo.job.output.value,在Spark中可能是这样的:
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);
Run Code Online (Sandbox Code Playgroud)
但是,我仍然想知道如何指定更新密钥MongoUpdateWritable.java.
不可否认,作为一种hacky方式,我将对象的"_id"设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖具有相同KeyValue的文档_id.
JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
BSONObject o = (BSONObject) s._1;
//for all keys, set _id to key:value_
String id = "";
for (String key …Run Code Online (Sandbox Code Playgroud)