我使用该属性将生产者配置为 10 秒超时transaction.timeout.ms。然而,交易似乎在 60 秒后中止,这个时间要长得多。
请看下面的程序:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokerConnectionString);
properties.setProperty("transactional.id", "my-transactional-id");
properties.setProperty("transaction.timeout.ms", "5000");
// start the first producer and write one event
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "value"));
// note the transaction is left non-completed
// start another producer with different txn.id and write second event
properties.setProperty("transactional.id", "another-transactional-id");
KafkaProducer<String, String> producer2 =
new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer2.initTransactions();
producer2.beginTransaction();
producer2.send(new ProducerRecord<>("topic", "value2"));
producer2.commitTransaction();
// consume the events using read-committed
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", brokerConnectionString);
consumerProps.setProperty("group.id", "my-group");
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(singleton("topic"));
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofSeconds(1))) {
logger.info(record.toString());
}
}
Run Code Online (Sandbox Code Playgroud)
大约 60 秒后打印,这value2是该transaction.timeout.ms参数的默认值。难道是我对房产的理解有误?
在写问题的过程中我找到了答案。Broker 配置为每 60 秒检查一次超时的生产者,因此事务将在下次检查时中止。该属性对其进行配置:transaction.abort.timed.out.transaction.cleanup.interval.ms. 我在测试前启动了代理,这就是为什么它总是需要大约 60 秒的时间。
| 归档时间: |
|
| 查看次数: |
6545 次 |
| 最近记录: |