当没有消费者连接时,Kafka 代理可以保留消息吗?

Bre*_*ett 3 java messagebroker apache-kafka

我正在尝试构建一个发布/订阅应用程序,并且正在探索最好的工具。我目前正在研究Kafka,并且已经运行了一个小演示应用程序。但是,我遇到了一个概念问题。

我有一个生产者(Java代码):

    String topicName = "MyTopic;
    String key = "MyKey";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093");
    props.put("acks", "all");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    Producer<String, byte[]> producer = new KafkaProducer <String, byte[]>(props);

    byte[] data = <FROM ELSEWHERE>;
    ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topicName, key, data);

    try {
        RecordMetadata result = producer.send(record).get();
    }
    catch (Exception e) {
        // Nothing for now
    }
    producer.close();
Run Code Online (Sandbox Code Playgroud)

当我通过 Kakfa 命令行工具启动消费者时:

kafka-console-consumer --bootstrap-server localhost:9092 --topic MyTopic
Run Code Online (Sandbox Code Playgroud)

然后我执行生产者代码,我看到数据消息显示在我的消费者终端上。

但是,如果我在执行生产者之前没有运行消费者,则该消息将显示为“丢失”。当我启动消费者时(执行生产者之后),消费者终端中什么也没有出现。

有谁知道是否可以让 Kafka 代理在没有连接消费者的情况下保留消息?如果是这样,怎么办?

ame*_*tic 7

附加--from-beginning到控制台消费者命令以使其从最早的偏移量开始消费。这实际上是关于由 config 控制的偏移重置策略auto.offset.reset。这个配置的含义如下:

当 Kafka 中没有初始偏移量或者当前偏移量在服务器上不再存在时(例如因为该数据已被删除)该怎么办:

earliest:自动将偏移量重置为最早的偏移量

latest:自动将偏移量重置为最新偏移量

none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常。其他:向消费者抛出异常。