某些记录的 Azure eventhub Kafka org.apache.kafka.common.errors.TimeoutException

Pra*_*7en 1 java apache-kafka spring-boot azure-eventhub

有一个包含 80 到 100 条记录的 ArrayList 尝试将每个单独的记录(POJO,而不是整个列表)流式传输并发送到 Kafka 主题(事件中心)。每小时安排一次 cron 作业,将这些记录 (POJO) 发送到事件中心。

能够看到发送到 eventhub 的消息,但在 3 到 4 次成功运行后出现以下异常(其中包括正在发送的几条消息和几条因以下异常而失败的消息)

    Expiring 14 record(s) for eventhubname: 30125  ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)

以下是使用的 Producer 的配置,

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
Run Code Online (Sandbox Code Playgroud)

Message Retention period - 7 Partition - 6 使用spring Kafka(2.2.3)发送标记为@Async写kafka发送的事件的方法

    @Async
    protected void send() {
       kafkatemplate.send(record);
    }
Run Code Online (Sandbox Code Playgroud)

预期 - kafka 不会抛出异常 实际 - org.apache.kafka.common.errors.TimeoutException 被抛出

小智 5

Prakash - 我们已经看到了许多问题,其中尖峰生产者模式看到批处理超时。

这里的问题是生产者有两个可以闲置超过 4 分钟的 TCP 连接 - 在这一点上,Azure 负载平衡器关闭闲置连接。Kafka 客户端不知道连接已关闭,因此它尝试在死连接上发送批处理,该连接超时,此时重试开始。

  • 将 connections.max.idle.ms 设置为 < 4mins - 这允许 Kafka 客户端的网络客户端层优雅地处理生产者发送消息的 TCP 连接的连接关闭
  • 将 metadata.max.age.ms 设置为 < 4mins – 这实际上是生产者元数据 TCP 连接的保持活动状态

请随时联系 Github 上的 EH 产品团队,我们非常擅长响应问题 - https://github.com/Azure/azure-event-hubs-for-kafka