我正在尝试在 Windows 10 上设置 kafka 事务生产者。我从 CLI 下载并作为单节点运行 kafka。运行如下:
(仅更改了 server.properties -> log.dirs )
一切都很好,卡夫卡已经启动并运行。默认测试:kafka-console- Producer 、 kafka-console-consumer 效果良好。
public class SampleTest {
private final static Logger logger = LoggerFactory.getLogger(SampleTest.class);
public static void main(String[] args) {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-trx-id123"); // set transaction id
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();
try {
producer.beginTransaction();
String firstMsg = "Hello 1";
producer.send(new …Run Code Online (Sandbox Code Playgroud)