har*_*der 1 elasticsearch apache-spark elasticsearch-5 elasticsearch-spark
我正在处理一个文本文件,并将转换后的行从Spark应用程序写入弹性搜索
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
Run Code Online (Sandbox Code Playgroud)
这运行速度非常慢,大约需要8分钟才能写入287.9 MB/1513789条记录.

如果网络延迟始终存在,我如何调整spark和elasticsearch设置以使其更快.
我在本地模式下使用spark,有16个内核和64GB RAM.我的elasticsearch集群有一个主节点和3个数据节点,每个节点有16个核心和64GB.
我正在阅读如下文本文件
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
Run Code Online (Sandbox Code Playgroud)
....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
Run Code Online (Sandbox Code Playgroud)
首先,让我们从您的应用程序中发生的事情开始.Apache Spark正在读取1个(不是那么大)csv压缩文件.因此,第一个spark将花费时间解压缩数据并在写入之前对其进行扫描elasticsearch.
这将创建一个Dataset/ DataFrame 一个分区(由您df.rdd.getNumPartitions在评论中提到的结果确认).
repartition在写入数据之前,一个直接的解决方案是读取和缓存数据elasticsearch.现在我不确定您的数据是什么样的,因此决定分区数量是您的基准.
val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache
Run Code Online (Sandbox Code Playgroud)
我不确定您的应用程序会带来多大好处,因为我相信可能存在其他瓶颈(网络IO,ES的磁盘类型).
PS:在构建ETL之前,我应该将csv转换为镶木地板文件.这里有真正的性能提升.(个人意见和基准)
另一种可能的优化是调整es.batch.size.entrieselasticsearch-spark连接器的设置.默认值为1000.
设置此参数时需要小心,因为您可能会使elasticsearch超载.我强烈建议您在这里查看可用的配置.
我希望这有帮助 !
| 归档时间: |
|
| 查看次数: |
1305 次 |
| 最近记录: |