我有一个应用程序,我在其中读取 csv 文件并进行一些转换,然后将它们从 spark 本身推送到弹性搜索。像这样
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + type).save()
Run Code Online (Sandbox Code Playgroud)
我有几个节点,在每个节点中,我运行 5-6 个spark-submit推送到elasticsearch
我经常收到错误
Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]
Run Code Online (Sandbox Code Playgroud)
我的 Elasticsearch 集群有以下统计信息 -
Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node …Run Code Online (Sandbox Code Playgroud) elasticsearch apache-spark apache-spark-sql elasticsearch-spark
我正在处理一个文本文件,并将转换后的行从Spark应用程序写入弹性搜索
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
Run Code Online (Sandbox Code Playgroud)
这运行速度非常慢,大约需要8分钟才能写入287.9 MB/1513789条记录.

如果网络延迟始终存在,我如何调整spark和elasticsearch设置以使其更快.
我在本地模式下使用spark,有16个内核和64GB RAM.我的elasticsearch集群有一个主节点和3个数据节点,每个节点有16个核心和64GB.
我正在阅读如下文本文件
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
Run Code Online (Sandbox Code Playgroud)
....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
Run Code Online (Sandbox Code Playgroud) elasticsearch apache-spark elasticsearch-5 elasticsearch-spark
apache-spark ×2