use*_*610 5 elasticsearch apache-spark
使用Elasticsearch with Spark时,更新现有实体的正确方法是什么?
我想要像下面这样的东西:
但是,有几个问题:
如果为了测试,我_id在新值的映射中硬编码,则抛出以下异常:
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest
应如何_id检索,以及如何将其传回Spark?
我在下面包含以下代码,以更好地说明我尝试做的事情:
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME,
"?source=,field1,field2).values();
Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
Map<String, Object> map = iter.next();
// Get existing values, and do transformation logic
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put("_id", ??????);
newMap.put("field1", new_value);
listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME);
Run Code Online (Sandbox Code Playgroud)
理想情况下,我希望更新现有地图,而不是创建新地图.
在使用Spark时,是否有任何示例代码可以显示更新elasticsearch中现有实体的正确方法?
谢谢
尝试将此 upsert 添加到您的 Spark:
.config("es.write.operation", "upsert")
Run Code Online (Sandbox Code Playgroud)
这将允许您向现有文档添加新字段
| 归档时间: |
|
| 查看次数: |
1143 次 |
| 最近记录: |