Kafka 事务性生产者 — read_committed 显示尽管中止的记录

sal*_*ent 1 java apache-kafka kafka-producer-api

我编写了这个简单的程序来测试 Kafka 中的新事务生产者:

package test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


class kafkatest {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer(props);

        producer.initTransactions();
        producer.beginTransaction();

        producer.send(new ProducerRecord<>("topic", "hello", "world"));  
        producer.flush();

        producer.abortTransaction();

        producer.close();
    }
}
Run Code Online (Sandbox Code Playgroud)

但是当我使用 with 时isolation.level=read_committed,该记录会出现:

--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic topic \ 
        --from-beginning \ 
        --consumer-property isolation.level=read_committed

world
Run Code Online (Sandbox Code Playgroud)

我错过了什么?

Mic*_*son 6

read_committed与控制台使用者一起使用,您需要指定--isolation-level选项:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic --from-beginning --isolation-level=read_committed
Run Code Online (Sandbox Code Playgroud)

否则,此选项默认为read_uncommitted并覆盖您通过 传递的值--consumer-property

  • 我的生命中失去了2个小时 (2认同)