小编Rod*_*ira的帖子

Spring kafka 和 Kafka 集群

我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。

但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。

我将 spring.kafka.bootstrap-servers 属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。

卡夫卡版本 0.10

有人知道如何正确配置吗?

编辑

我测试了一件事,发生了奇怪的行为。当我启动服务时,我向主题发送一条消息(强制创建)

代码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}
Run Code Online (Sandbox Code Playgroud)

所以,这次我没有启动 kafka-1 服务器(只是其他服务器)并且发生了异常:

org.springframework.kafka.core.KafkaProducerException:发送失败;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:60000 毫秒后无法更新元数据。

似乎 spring-kafka 只是尝试在第一个引导服务器上连接。我正在使用 spring-kafka 1.3.5.RELEASE 和 kafka 0.10.1.1

编辑 2

我已经做了你做的测试。当我删除领导者已更改的第一个 docker 容器 (kafka-1) 时,也会发生同样的情况。所以,我的消费者(弹簧服务)无法消费消息。但是当我再次启动 kafka-1 时,该服务会收到所有消息我的消费者 ConcurrentKafkaListenerContainerFactory:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

6
推荐指数
1
解决办法
4140
查看次数

标签 统计

apache-kafka ×1

spring-kafka ×1