fetch-min-size 和 max-poll-records sping kafka 配置无法按预期工作

Dis*_*ble 2 apache-kafka spring-boot kafka-consumer-api spring-kafka

我正在使用 spring kafka 开发一个 Spring boot 应用程序,该应用程序侦听 kafka 的单个主题,然后隔离各个类别的记录,从中创建一个 json 文件并将其上传到 AWS S3。

我在 Kafka 主题中收到大量数据,我需要确保 json 文件分块得足够大,以限制上传到 S3 的 json 数量。

以下是我application.yml对 kafka 消费者的配置。

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
Run Code Online (Sandbox Code Playgroud)

我创建了一个监听器来连续阅读该主题。

即使使用上述配置,我在控制台中收到的记录如下:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: 96
   2019-03-27T15:25:56.56+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.560  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 76. No Of measures: 39
   2019-03-27T15:25:56.73+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.732  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 77. No Of measures: 66
Run Code Online (Sandbox Code Playgroud)

谁能告诉我可以配置什么来根据配置获取收到的记录application.yml

Gar*_*ell 7

我刚刚复制了您的配置(除了最大等待 - 请参阅我使用的语法)并且它工作正常......

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 1s
      fetch-min-size: 500000000
      max-poll-records: 50000000
Run Code Online (Sandbox Code Playgroud)
2019-03-27 13:43:55.454  INFO 98982 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 1000
    fetch.min.bytes = 500000000
    group.id = newton
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 50000000
    ...
Run Code Online (Sandbox Code Playgroud)

您可以使用 属性来设置不直接支持作为引导属性的任意属性...properties

例如

spring:
  kafka:
    consumer:
      properties:
        max.poll.interval.ms: 300000
Run Code Online (Sandbox Code Playgroud)

或者

spring:
  kafka:
    consumer:
      properties:
         max:
           poll:
             interval:
               ms: 300000
Run Code Online (Sandbox Code Playgroud)

文档在这里

自动配置支持的属性如附录 A,通用应用程序属性所示。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 点分属性。有关详细信息,请参阅 Apache Kafka 文档。

这些属性中的前几个适用于所有组件(生产者、消费者、管理员和流),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过 KafkaProperties 类获得。如果您希望使用不直接支持的附加属性来配置生产者或消费者,请使用以下属性:

spring.kafka.properties.prop.one=first

spring.kafka.admin.properties.prop.two=second

spring.kafka.consumer.properties.prop.three=third

spring.kafka.producer.properties.prop.four=fourth

spring.kafka.streams.properties.prop.five=fifth