Spark Dataframe 更新插入到 Elasticsearch

Dan*_*iel 5 scala dataframe elasticsearch apache-spark

我正在使用 Apache Spark DataFrame,我想将数据 upsert 到 Elasticsearch,我发现我可以像这样覆盖它们

val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.ssl","true")
  .option("es.nodes", esURL)
  .option("es.mapping.id", index)
  .mode("Overwrite")
  .save("index/dogs")
Run Code Online (Sandbox Code Playgroud)

但到目前为止我注意到这个命令mode("Overwrite")实际上是删除所有现有的重复数据并插入新数据

有没有办法让upsert他们不删除并重新编写它们?因为我需要几乎实时查询这些数据。提前致谢

Dan*_*iel 5

为什么的原因mode("Overwrite")是一个问题是,当你覆盖整个数据帧将删除所有数据与您的数据帧行一次,它看起来像整个索引的比赛是空的,我和我想出如何实际上upsert

这是我的代码

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.nodes.discovery", "false")
  .option("es.nodes.client.only", "false")
  .option("es.net.ssl","true")
  .option("es.mapping.id", index)
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .option("es.port", "443")
  .mode("append")
  .save(path)
Run Code Online (Sandbox Code Playgroud)

请注意,您必须放置"es.write.operation", "upert".mode("append")


Con*_*ine 1

尝试设置:

es.write.operation = upsert
Run Code Online (Sandbox Code Playgroud)

这应该执行所需的操作。您可以在https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html中找到更多详细信息