标签: elasticsearch-spark

弹性搜索无法写入所有条目:可能是es被重载

我有一个应用程序,我在其中读取 csv 文件并进行一些转换,然后将它们从 spark 本身推送到弹性搜索。像这样

input.write.format("org.elasticsearch.spark.sql")
              .mode(SaveMode.Append)
              .option("es.resource", "{date}/" + type).save()
Run Code Online (Sandbox Code Playgroud)

我有几个节点,在每个节点中,我运行 5-6 个spark-submit推送到elasticsearch

我经常收到错误

Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
        rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]
Run Code Online (Sandbox Code Playgroud)

我的 Elasticsearch 集群有以下统计信息 -

Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node …
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-spark apache-spark-sql elasticsearch-spark

3
推荐指数
1
解决办法
2707
查看次数

从spark写入elasticsearch非常慢

我正在处理一个文本文件,并将转换后的行从Spark应用程序写入弹性搜索

input.write.format("org.elasticsearch.spark.sql")
      .mode(SaveMode.Append)
      .option("es.resource", "{date}/" + dir).save()
Run Code Online (Sandbox Code Playgroud)

这运行速度非常慢,大约需要8分钟才能写入287.9 MB/1513789条记录. 在此输入图像描述

如果网络延迟始终存在,我如何调整spark和elasticsearch设置以使其更快.

我在本地模式下使用spark,有16个内核和64GB RAM.我的elasticsearch集群有一个主节点和3个数据节点,每个节点有16个核心和64GB.

我正在阅读如下文本文件

 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "inferSchema" -> "false",
  "header" -> "false",
  "delimiter" -> "\t",
  "comment" -> "#",
  "mode" -> "PERMISSIVE")
Run Code Online (Sandbox Code Playgroud)

....

val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-spark elasticsearch-5 elasticsearch-spark

1
推荐指数
1
解决办法
1305
查看次数