我正在研究Spring Boot KafkaConfluence 的示例并运行简单的生产者示例并出现以下错误。我使用 Windows 机器并在 Windows 上安装了 ubunt 14.04 LTS。
注意 - 即使我使用 localhost,它仍然无法通过代码工作。
[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mc.e.demo.HelloWorldKafkaApplication [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.example.demo.KafkaProducerService [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.k.clients.producer.ProducerConfig …Run Code Online (Sandbox Code Playgroud) 我正在测试KAFKA(2.11_2.1.0)代理故障转移。
我有3个节点:
节点0:
broker.id=0
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
Run Code Online (Sandbox Code Playgroud)
节点1:
broker.id=1
listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093
Run Code Online (Sandbox Code Playgroud)
节点2:
broker.id=2
listeners=PLAINTEXT://localhost:9094
advertised.listeners=PLAINTEXT://localhost:9094
Run Code Online (Sandbox Code Playgroud)
我创建了一个包含3个分区和3个副本的主题。我用config创建了一个生产者:
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
Run Code Online (Sandbox Code Playgroud)
我使用config创建了3个使用者作为主题:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pulSize + "");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
Run Code Online (Sandbox Code Playgroud)
场景1:
当node0停止,消费者不消费的消息,但生产者生产的消息,开始的时候node0,一切正常。
方案2:
当node1或node2停止,消费者的消费信息和生产者生产的消息,一切都OK。
为什么故障转移在方案1中不起作用?
当我运行这个命令时,我得到 2 个主题。我知道我创建了测试主题,但我看到了一个名为“__consumer_offsets”的附加主题。顾名思义,它与消费者偏移量有关,但它是如何使用的?
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 1 Leader: 0 Replicas: 0 Isr: 0
*
*
*
Topic: __consumer_offsets Partition: 48 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 49 Leader: 0 Replicas: 0 Isr: 0
Run Code Online (Sandbox Code Playgroud)
这发生在 Kafka 1.1.0 以及为什么有 50 个分区。也在寻找一种方法来禁用它,因为每次我尝试运行“描述”主题时,它首先打印 __consumer_offsets 的 50 个分区,然后打印我的主题。