Pra*_*7en 1 java apache-kafka spring-boot azure-eventhub
有一个包含 80 到 100 条记录的 ArrayList 尝试将每个单独的记录(POJO,而不是整个列表)流式传输并发送到 Kafka 主题(事件中心)。每小时安排一次 cron 作业,将这些记录 (POJO) 发送到事件中心。
能够看到发送到 eventhub 的消息,但在 3 到 4 次成功运行后出现以下异常(其中包括正在发送的几条消息和几条因以下异常而失败的消息)
Expiring 14 record(s) for eventhubname: 30125 ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)
以下是使用的 Producer 的配置,
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
Run Code Online (Sandbox Code Playgroud)
Message Retention period - 7 Partition - 6 使用spring Kafka(2.2.3)发送标记为@Async写kafka发送的事件的方法
@Async
protected void send() {
kafkatemplate.send(record);
}
Run Code Online (Sandbox Code Playgroud)
预期 - kafka 不会抛出异常 实际 - org.apache.kafka.common.errors.TimeoutException 被抛出
小智 5
Prakash - 我们已经看到了许多问题,其中尖峰生产者模式看到批处理超时。
这里的问题是生产者有两个可以闲置超过 4 分钟的 TCP 连接 - 在这一点上,Azure 负载平衡器关闭闲置连接。Kafka 客户端不知道连接已关闭,因此它尝试在死连接上发送批处理,该连接超时,此时重试开始。
请随时联系 Github 上的 EH 产品团队,我们非常擅长响应问题 - https://github.com/Azure/azure-event-hubs-for-kafka
| 归档时间: |
|
| 查看次数: |
915 次 |
| 最近记录: |