Sam*_*ane 14 apache-kafka apache-spark spark-streaming kafka-consumer-api
是否可以限制Kafka消费者为Spark Streaming返回的批次大小?
我问,因为我得到的第一批有数亿条记录,处理和检查它们需要很长时间.
Vla*_*cak 26
我认为您的问题可以通过Spark Streaming Backpressure解决.
检查spark.streaming.backpressure.enabled和spark.streaming.backpressure.initialRate.
默认情况下spark.streaming.backpressure.initialRate是没有设置和 spark.streaming.backpressure.enabled被禁用默认情况下,所以我想火花将采取尽可能多的,因为他可以.
spark.streaming.backpressure.enabled:
这使Spark Streaming能够根据当前的批处理调度延迟和处理时间来控制接收速率,以便系统只接收系统可以处理的速度.在内部,这动态地设置接收器的最大接收速率.这个速率的上限取决于值
spark.streaming.receiver.maxRate,spark.streaming.kafka.maxRatePerPartition如果设置了它们(见下文).
因为你想要控制第一批,或者更具体 - 第一批消息的数量,我认为你需要 spark.streaming.backpressure.initialRate
spark.streaming.backpressure.initialRate:
这是在启用背压机制时每个接收器将接收第一批数据的初始最大接收速率.
当你的Spark工作(分别是Spark工作者)能够处理来自kafka的10000条消息时,这个很好,但是kafka经纪人会给你的工作提供100000条消息.
也许你也有兴趣检查Jeroen van Wilgenburg在他的博客spark.streaming.kafka.maxRatePerPartition上对这些属性的一些研究和建议.
小智 6
除了以上答案。批量大小是3个参数的乘积
batchDuration:将流数据分成几批的时间间隔(以秒为单位)。spark.streaming.kafka.maxRatePerPartition:设置每秒每个分区的最大消息数。当与结合使用时,batchDuration将控制批量大小。您希望将maxRatePerPartition其设置为大(否则将实际上限制您的工作)并且batchDuration要很小。为了更好地说明此产品在启用/禁用反压时的工作方式(将createDirectStream设置为spark.streaming.kafka.maxRatePerPartition)
| 归档时间: |
|
| 查看次数: |
12854 次 |
| 最近记录: |