我正在使用 spring kafka 开发一个 Spring boot 应用程序,该应用程序侦听 kafka 的单个主题,然后隔离各个类别的记录,从中创建一个 json 文件并将其上传到 AWS S3。
我在 Kafka 主题中收到大量数据,我需要确保 json 文件分块得足够大,以限制上传到 S3 的 json 数量。
以下是我application.yml对 kafka 消费者的配置。
spring:
kafka:
consumer:
group-id: newton
auto-offset-reset: earliest
fetch-max-wait:
seconds: 1
fetch-min-size: 500000000
max-poll-records: 50000000
value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
Run Code Online (Sandbox Code Playgroud)
我创建了一个监听器来连续阅读该主题。
即使使用上述配置,我在控制台中收到的记录如下:
2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 56. No Of measures: 60
2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 80. No Of measures: …Run Code Online (Sandbox Code Playgroud)