火花流的奇怪延迟

luc*_*kim 5 streaming scala apache-kafka apache-spark spark-streaming

最近在kafka中使用spark Streaming处理数据。

应用程序启动并完成几批后,出现持续延迟。

大多数时候,数据处理在1-5秒内完成。

但经过几个批次后,连续耗时41~45秒,大部分延迟发生在从stage0取数据的区域。

我无意中发现 Kafka request.timemout.ms 设置默认为 40 秒,并将此设置更改为 10 秒。

然后我重新启动应用程序并观察到该批处理在 11 到 15 秒内完成。

实际处理时间为1-5秒。我无法理解这种延迟。

怎么了?

我的环境如下。

Spark流2.1.0(createDirectStream)

卡夫卡:0.10.1

批次间隔:20s

请求超时时间:10s

/////

以下截图是 request.timeout.ms 设置为 8 秒时的图表。

在此输入图像描述

cha*_*les 1

我找到了问题和解决方案:

基本上,当您从执行器读取kafka的每个分区时,Spark Streaming为了提高性能或读取和处理,会将读取的分区内容缓存在内存中。

如果主题的大小太大,缓存可能会溢出,并且当 kafka 连接执行 fetch 到 kafka 时,缓存已满并超时。

解决方案:如果您使用的是spark 2.2.0或更高版本(来自spark文档),这就是解决方案,是spark和cloudera已知的错误:

消费者的缓存默认最大大小为 64。如果您希望处理超过(64 * 执行程序数量)个 Kafka 分区,您可以通过spark.streaming.kafka.consumer.cache.maxCapacity 更改此设置。

如果您想禁用 Kafka 消费者的缓存,可以将 Spark.streaming.kafka.consumer.cache.enabled 设置为 false。可能需要禁用缓存才能解决 SPARK-19185 中描述的问题。一旦 SPARK-19185 得到解决,此属性可能会在 Spark 的更高版本中被删除。

缓存以 topicpartition 和 group.id 为键,因此每次调用 createDirectStream 时使用单独的 group.id。

Spark.streaming.kafka.consumer.cache.enabled 为 false在您的 Spark-Submit 作为参数中,您的 mini-bacth 性能将像超音速飞机一样。