inn*_*ism 5 apache-spark spark-streaming
我需要为我的应用程序增加每个分区的输入速率,我已经.set("spark.streaming.kafka.maxRatePerPartition",100)用于配置.流持续时间是10秒,所以我希望5*100*10=5000这批次的流程消息.但是,我收到的输入率大约是500.您能否建议任何修改以提高此费率?
流持续时间为10秒,因此我希望此批次的流程为5*100*10 = 5000条消息.
这不是设置意味着什么.这意味着"每一分区多少元件可以具有每批次 ",而不是每秒.我假设你有5个分区,所以你得到5*100 = 500.如果你想要5000,设置maxRatePerPartition为1000.
来自"Apache Kafka的精确一次Spark Streaming"(由Cody编写,直接流方法的作者,强调我的):
对于速率限制,您可以使用Spark配置变量
spark.streaming.kafka.maxRatePerPartition来设置每个分区的每个分区的最大消息数.
在@avrs评论之后,我查看了定义最大速率的代码.事实证明,启发式比博客文章和文档中的说法要复杂一些.
有两个分支.如果在maxRate旁边启用背压,则maxRate是RateEstimator对象计算的当前背压率与用户设置的maxRate 之间的最小值.如果未启用,则将maxRate定义为原样.
现在,在选择费率后,它总是乘以总批次秒,实际上是每秒的费率:
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8604 次 |
| 最近记录: |