Kafka 节点重启时消息丢失

Aka*_*rot 7 apache-kafka

我正在 AWS 上运行一个 3 节点的 Kafka 集群。

卡夫卡版本:0.10.2.1
动物园管理员版本:3.4

在执行一些稳定性测试时,我注意到当我关闭领导节点时消息会丢失。

这些是重现问题的步骤:

创建一个复制因子为 3 的主题,它应该使所有 3 个节点上的数据都可用。:

~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --describe --topic stackoverflow
Topic:stackoverflow    PartitionCount:20    ReplicationFactor:3    Configs:
    Topic: stackoverflow    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 4    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 5    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 6    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 7    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 8    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 9    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 10    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 11    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 12    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 13    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 14    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 15    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 16    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 17    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 18    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 19    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
Run Code Online (Sandbox Code Playgroud)

使用以下代码开始制作该主题:

import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'])

try:
    count = 0
    while True:
        producer.send('stackoverflow', 'message')
        producer.flush()
        count += 1
        time.sleep(1)
except KeyboardInterrupt:
    print "Sent %s messages." % count
Run Code Online (Sandbox Code Playgroud)

此时我杀死其中一台机器并等待它返回到集群。

当它回来时,我停止生产者并使用来自该主题的所有消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('stackoverflow',
                            bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'],
                            auto_offset_reset='earliest',
                            enable_auto_commit=False)
try:
    count = 0
    for message in consumer:
        count += 1
        print message
except KeyboardInterrupt:
    print "Received %s messages." % count
Run Code Online (Sandbox Code Playgroud)

已发送的两条消息丢失。生产者没有返回任何错误。

kafka $ python producer.py
Sent 762 messages.

kafka $ python consumer.py
Received 760 messages.
Run Code Online (Sandbox Code Playgroud)

我是 Kafka 的新手,所以我真的很感激任何进一步调试它的想法。或者使集群更具弹性的说明。

谢谢您的帮助!

Aka*_*rot 2

最后我认为丢失消息的原因是重试次数不足。在阅读了一些有关高可用 kafka 的博客文章后,我注意到人们为“重试”参数推荐了非常高的值。

在 python 中,这将是:

producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)
Run Code Online (Sandbox Code Playgroud)

我再次进行测试,确认没有消息丢失。