luc*_*kim 5 streaming scala apache-kafka apache-spark spark-streaming
最近在kafka中使用spark Streaming处理数据。
应用程序启动并完成几批后,出现持续延迟。
大多数时候,数据处理在1-5秒内完成。
但经过几个批次后,连续耗时41~45秒,大部分延迟发生在从stage0取数据的区域。
我无意中发现 Kafka request.timemout.ms 设置默认为 40 秒,并将此设置更改为 10 秒。
然后我重新启动应用程序并观察到该批处理在 11 到 15 秒内完成。
实际处理时间为1-5秒。我无法理解这种延迟。
怎么了?
我的环境如下。
Spark流2.1.0(createDirectStream)
卡夫卡:0.10.1
批次间隔:20s
请求超时时间:10s
/////
以下截图是 request.timeout.ms 设置为 8 秒时的图表。
我找到了问题和解决方案:
基本上,当您从执行器读取kafka的每个分区时,Spark Streaming为了提高性能或读取和处理,会将读取的分区内容缓存在内存中。
如果主题的大小太大,缓存可能会溢出,并且当 kafka 连接执行 fetch 到 kafka 时,缓存已满并超时。
解决方案:如果您使用的是spark 2.2.0或更高版本(来自spark文档),这就是解决方案,是spark和cloudera已知的错误:
消费者的缓存默认最大大小为 64。如果您希望处理超过(64 * 执行程序数量)个 Kafka 分区,您可以通过spark.streaming.kafka.consumer.cache.maxCapacity 更改此设置。
如果您想禁用 Kafka 消费者的缓存,可以将 Spark.streaming.kafka.consumer.cache.enabled 设置为 false。可能需要禁用缓存才能解决 SPARK-19185 中描述的问题。一旦 SPARK-19185 得到解决,此属性可能会在 Spark 的更高版本中被删除。
缓存以 topicpartition 和 group.id 为键,因此每次调用 createDirectStream 时使用单独的 group.id。
Spark.streaming.kafka.consumer.cache.enabled 为 false在您的 Spark-Submit 作为参数中,您的 mini-bacth 性能将像超音速飞机一样。
| 归档时间: |
|
| 查看次数: |
2347 次 |
| 最近记录: |