nil*_*212 5 hadoop apache-kafka apache-spark spark-streaming kafka-consumer-api
我正在使用DirectAPI在纱线上运行火花流(1.6.1)来读取具有50个分区并在HDFS上写入的Kafka主题的事件.我的批处理间隔为60秒.我收到了大约500K的消息,这些消息在60秒内被处理.
突然火花开始接收15-20万条消息,大约需要5-6分钟处理,批处理间隔为60秒.我已经配置好了"spark.streaming.concurrentJobs=4".
因此,当批处理需要很长时间来处理时,spark会启动并发4个活动任务来处理积压批处理但仍然会在一段时间内批量积压增加,因为批处理间隔对于这样的数据量来说太小了.
我对此几乎没有疑问.
当我开始接收15-20万条消息和处理这些消息的时间大约是5-6分钟,批处理间隔为60秒.当我检查我的HDFS目录时,我看到为每个60秒创建的文件包含50个部分文件,我很困惑,这里我的批处理在5-6分钟内得到处理,然后如何每隔1分钟在HDFS上写文件&'saveAsTextFile'动作是每批只调用一次.所有文件的总记录50个零件文件大约有330万个.
为了处理1500万到2000万条消息的处理,我将批处理间隔配置为8-10分钟,现在火花开始消耗来自Kafka的大约35-40万条消息,并且其处理时间再次开始超过批处理间隔.
我已配置'spark.streaming.kafka.maxRatePerPartition=50'&'spark.streaming.backpressure.enabled=true'.
我认为可能让您感到困惑的一件事是工作长度和频率之间的关系。
根据您的描述,在可用资源的情况下,这项工作最终大约需要 5 分钟才能完成。但是您的批次频率是 1 分钟。
因此,每 1 分钟您就会启动一些需要 5 分钟才能完成的批次。
因此,最终您会期望看到 HDFS 在前几分钟没有收到任何信息,然后您将继续每 1 分钟收到一些信息(但从数据进入时起有 5 分钟的“延迟”)。