为createDirectStream设置spark.streaming.kafka.maxRatePerPartition

inn*_*ism 5 apache-spark spark-streaming

我需要为我的应用程序增加每个分区的输入速率,我已经.set("spark.streaming.kafka.maxRatePerPartition",100)用于配置.流持续时间是10秒,所以我希望5*100*10=5000这批次的流程消息.但是,我收到的输入率大约是500.您能否建议任何修改以提高此费率?

Yuv*_*kov 9

流持续时间为10秒,因此我希望此批次的流程为5*100*10 = 5000条消息.

这不是设置意味着什么.这意味着"每一分区多少元件可以具有每批次 ",而不是每秒.我假设你有5个分区,所以你得到5*100 = 500.如果你想要5000,设置maxRatePerPartition为1000.

来自"Apache Kafka的精确一次Spark Streaming"(由Cody编写,直接流方法的作者,强调我的):

对于速率限制,您可以使用Spark配置变量 spark.streaming.kafka.maxRatePerPartition来设置每个分区的每个分区的最大消息.

编辑:

在@avrs评论之后,我查看了定义最大速率的代码.事实证明,启发式比博客文章和文档中的说法要复杂一些.

有两个分支.如果在maxRate旁边启用背压,则maxRate是RateEstimator对象计算的当前背压率与用户设置的maxRate 之间的最小值.如果未启用,则将maxRate定义为原样.

现在,在选择费率后,它总是乘以总批次秒,实际上是每秒的费率:

if (effectiveRateLimitPerPartition.values.sum > 0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
  Some(effectiveRateLimitPerPartition.map {
    case (tp, limit) => tp -> (secsPerBatch * limit).toLong
  })
} else {
  None
}
Run Code Online (Sandbox Code Playgroud)

  • @YuvalItzchakov 我发现了这个问题。spark.streaming.kafka.maxRatePerPartition 实际上实现为批处理流的每个分区*每秒*的速率。因此,如果 maxRatePerPartition 为 500,批处理间隔为 10 秒,则每批处理的最大速率为 5,000。因此,我建议将此参数重命名为 maxRatePerPartitionPerSecond 以使其更清晰/ (3认同)
  • @Michael我不确定你是否读过我的答案,但我会引用:*现在,在选择费率后,它总是乘以总批次秒数,实际上是每秒的费率*. (2认同)