禁用卡夫卡会话超时

whe*_*ler 5 apache-camel apache-kafka

我试图通过Apache Camel的源代码来确定错误的来源。尽管StringDeserializer为使用者配置了a ,但我仍然收到此错误:

org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
Run Code Online (Sandbox Code Playgroud)

当我尝试逐步了解Camel以找出反序列化的String仍然如何以字节数组结尾时,Camel会继续关闭自身,因为协调器认为Consumer已死:

20:45:04.171 [kafka-coordinator-heartbeat-thread | rtp-creditor-receive-payment] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=rtp-creditor-receive-payment] Marking the coordinator rtp-demo-cluster-kafka-0.rtp-demo-cluster-kafka-brokers.rtp-reference.svc.cluster.local:9092 (id: 2147483647 rack: null) dead
Run Code Online (Sandbox Code Playgroud)

如何完全禁用所有超时,以便可以单步执行源代码而不必担心使用者被标记为已死?

小智 0

虽然您无法禁用 Kafka 集群与其消费者之间的所有超时,但您可以将一些属性修改为很长:

  • group.max.session.timeout.ms- 这是任何消费者的最大会话超时。默认值为五分钟。2100000000在通常名为 的代理属性文件中将其设置为大约最大整数,例如server.properties
  • max.poll.interval.ms- 这类似于会话超时,如果在此时间间隔内没有轮询,则会将消费者标记为死亡。也将其设置为小于的值request.timeout.ms,例如1900000000

在 Apache Camel 中,您需要设置以下属性:

  • consumerRequestTimeoutMs- 这是等待客户端响应的最长时间。将其设置为2000000000.
  • sessionTimeoutMs- 这可能是会话超时,将您的消费者标记为死亡。应将其设置为小于 的值request.timeout.ms。所以,类似的东西1900000000

可能还有更多,可以在这里找到,任何有timeout或都ms值得一看。