Elasticearch和Spark:更新现有实体

use*_*610 5 elasticsearch apache-spark

使用Elasticsearch with Spark时,更新现有实体的正确方法是什么?

我想要像下面这样的东西:

  1. 将现有数据作为地图获取.
  2. 创建一个新地图,并使用更新的字段填充它.
  3. 坚持新地图.

但是,有几个问题:

  1. 返回的字段列表不能包含_id,因为它不是源的一部分.
  2. 如果为了测试,我_id在新值的映射中硬编码,则抛出以下异常:

    org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest

应如何_id检索,以及如何将其传回Spark?

我在下面包含以下代码,以更好地说明我尝试做的事情:

JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME, 
"?source=,field1,field2).values();

Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
   Map<String, Object> map = iter.next();
   // Get existing values, and do transformation logic

   Map<String, Object> newMap = new HashMap<String, Object>();
   newMap.put("_id", ??????);
   newMap.put("field1", new_value);
   listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 
Run Code Online (Sandbox Code Playgroud)

理想情况下,我希望更新现有地图,而不是创建新地图.

在使用Spark时,是否有任何示例代码可以显示更新elasticsearch中现有实体的正确方法?

谢谢

ale*_*ool 1

尝试将此 upsert 添加到您的 Spark:

.config("es.write.operation", "upsert")
Run Code Online (Sandbox Code Playgroud)

这将允许您向现有文档添加新字段