Kafka 中的事务是什么意思?我当然知道普通的SQL事务:A transaction is a sequence of operations performed (using one or more SQL statements) on a database as a single logical unit of work
那么这是否意味着可以向 Kafka 发送一些内容,如果出现问题,它将被回滚(从分区中删除消息?)
并且是否可以在事务中写入不同的主题?
我有一个 Kafka 包装器库,它仅在生产端使用事务。图书馆不覆盖消费者。制作者发布到多个主题。目标是实现交易性。因此,生产应该要么成功,这意味着每个主题中应该只有一次写入的消息副本,要么失败,这意味着消息没有写入任何主题。该库的用户是在 Kubernetes Pod 上运行的应用程序。因此,Pod 可能会失败或频繁重新启动。此外,发送消息时不会显式设置分区。
我的问题是,我应该如何为生产者选择 transactional.id?我的第一个想法是在对象启动时简单地选择 UUID,并将 transaction.timeout.ms 设置为某个合理的时间(几秒钟)。这样,如果生产者由于 Pod 重新启动而终止,消费者不会永远锁定事务。
这个策略有什么缺陷吗?有没有更聪明的方法来做到这一点?另外,我不能向图书馆用户索要某种 ID。
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。
事务性 Kafka Producer 的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud) transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api
我们如何确保 Kafka 在读过程场景中的精确一次语义。read 意味着我们正在从 Kafka 主题中读取数据并进行一些处理,然后我们尝试提交偏移量。假设我们处理了消息,但无法提交,并且在提交之前进程崩溃了。重新启动后,再次尝试使用相同的消息。那么如何处理这样的场景呢?这可以用 Kafka Transaction API 来处理吗?
有类似的问题,但无法正确理解它,也留下了很少的评论。只是想确认我的理解。 对 Kafka 的一次性语义感到困惑