如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

杨嘉辰*_*杨嘉辰 6 apache-kafka pyspark

我想限制从 kafka 获取数据时的速率。我的代码看起来像:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()
Run Code Online (Sandbox Code Playgroud)

然而,当我打电话时df.count(),结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

dbu*_*osp 7

每个分区 (0, 1, 2) 包含 200 条记录,总数为 600 条记录。

正如你在这里看到的:

使用 maxOffsetsPerTrigger 选项来限制每个触发器要获取的记录数。

这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。

  • 对就这样!通常,您只能使用“max.poll.records”限制从 fetch/poll 操作获取的记录数,如果您更喜欢使用字节,请使用“replica.fetch.max.bytes”或“replica.fetch.min.bytes” 。 (2认同)
  • 那么设置 `max.poll.records` 和 `maxOffsetsPerTrigger` 之间有什么区别? (2认同)