Kafka代理故障转移-当第一个节点关闭时,使用者无法工作

lor*_*ghi 5 failover apache-kafka

我正在测试KAFKA(2.11_2.1.0)代理故障转移。

我有3个节点:

节点0:

broker.id=0 
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092 
Run Code Online (Sandbox Code Playgroud)

节点1:

broker.id=1
listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093
Run Code Online (Sandbox Code Playgroud)

节点2:

broker.id=2
listeners=PLAINTEXT://localhost:9094
advertised.listeners=PLAINTEXT://localhost:9094
Run Code Online (Sandbox Code Playgroud)

我创建了一个包含3个分区和3个副本的主题。我用config创建了一个生产者:

properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
Run Code Online (Sandbox Code Playgroud)

我使用config创建了3个使用者作为主题:

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pulSize + "");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
Run Code Online (Sandbox Code Playgroud)

场景1

node0停止,消费者不消费的消息,但生产者生产的消息,开始的时候node0,一切正常。

方案2

node1node2停止,消费者的消费信息和生产者生产的消息,一切都OK。

为什么故障转移在方案1中不起作用?