Nik*_*iev 1 scala elasticsearch apache-spark elasticsearch-hadoop
我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:
_id应提供Elasticsearch文档这是我到目前为止所尝试的。
saveJsonToEs()我尝试saveJsonToEs()像这样使用(序列化文档包含_id具有所需 Elasticsearch ID 的字段):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
但elasticsearch-hadoop图书馆给出了这个例外:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
Run Code Online (Sandbox Code Playgroud)
如果我删除es.mapping.exclude但保留es.mapping.id并发送带有内部的 JSON _id(如{"_id":"blah",...})
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
Run Code Online (Sandbox Code Playgroud)
当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",...":
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
它无法提取该字段:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
Run Code Online (Sandbox Code Playgroud)
当我从配置中删除es.mapping.id和时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):es.mapping.exclude
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
saveToEsWithMeta()还有另一个函数可以提供用于插入的_id其他元数据:saveToEsWithMeta()它可以解决要求 2,但无法解决要求 1。
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop发送的内容:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
Run Code Online (Sandbox Code Playgroud)
(documentID, serializedDocument)是否可以将Spark的集合写入 Elasticsearch(使用elasticsearch-hadoop)?
PS我使用Elasticsearch 5.6.3和Spark 2.1.1。
最后我发现了问题:这是配置中的拼写错误。
[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
Run Code Online (Sandbox Code Playgroud)
它正在寻找一个字段superID,但只有superID(注意这种情况)。在这个问题中,它也有点误导,因为在代码中它看起来像"es.mapping.id", "superID"(这是不正确的)。
实际的解决方案就像Levi Ramsey建议的那样:
val json = """{"foo":"bar","superID":"deadbeef"}"""
val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
("es.mapping.id", "superID"),
("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)
Run Code Online (Sandbox Code Playgroud)
不同之处在于es.mapping.id不能_id(正如原始帖子中所指出的那样,_id元数据是元数据,而 Elasticsearch 不接受它)。
当然,这意味着新字段superID应该添加到映射中(除非映射是动态的)。如果在索引中存储额外的字段是一种负担,那么还应该:
非常感谢亚历克斯·萨维茨基指出了正确的方向。
| 归档时间: |
|
| 查看次数: |
7289 次 |
| 最近记录: |