Elasticsearch + Spark:使用自定义文档_id编写json

Nik*_*iev 1 scala elasticsearch apache-spark elasticsearch-hadoop

我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:

  1. 文档已以 JSON 格式序列化,应按原样编写
  2. _id应提供Elasticsearch文档

这是我到目前为止所尝试的。

saveJsonToEs()

我尝试saveJsonToEs()像这样使用(序列化文档包含_id具有所需 Elasticsearch ID 的字段):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

elasticsearch-hadoop图书馆给出了这个例外:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
Run Code Online (Sandbox Code Playgroud)

如果我删除es.mapping.exclude但保留es.mapping.id并发送带有内部的 JSON _id(如{"_id":"blah",...}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
Run Code Online (Sandbox Code Playgroud)

当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",..."

 val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "superID")
)

EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

它无法提取该字段:

17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
Run Code Online (Sandbox Code Playgroud)

当我从配置中删除es.mapping.id和时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):es.mapping.exclude

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

saveToEsWithMeta()

还有另一个函数可以提供用于插入的_id其他元数据saveToEsWithMeta()它可以解决要求 2,但无法解决要求 1。

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop发送的内容:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
Run Code Online (Sandbox Code Playgroud)

问题

(documentID, serializedDocument)是否可以将Spark的集合写入 Elasticsearch(使用elasticsearch-hadoop)?

PS我使用Elasticsearch 5.6.3和Spark 2.1.1。

Nik*_*iev 6

最后我发现了问题:这是配置中的拼写错误。

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
Run Code Online (Sandbox Code Playgroud)

它正在寻找一个字段superID,但只有superID(注意这种情况)。在这个问题中,它也有点误导,因为在代码中它看起来像"es.mapping.id", "superID"(这是不正确的)。

实际的解决方案就像Levi Ramsey建议的那样:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)
Run Code Online (Sandbox Code Playgroud)

不同之处在于es.mapping.id不能_id(正如原始帖子中所指出的那样,_id元数据是元数据,而 Elasticsearch 不接受它)。

当然,这意味着新字段superID应该添加到映射中(除非映射是动态的)。如果在索引中存储额外的字段是一种负担,那么还应该:

  • 将其从映射中排除
  • 并禁用其索引

非常感谢亚历克斯·萨维茨基指出了正确的方向。