标签: kafka-transactions-api

什么是卡夫卡交易?

Kafka 中的事务是什么意思?我当然知道普通的SQL事务:A transaction is a sequence of operations performed (using one or more SQL statements) on a database as a single logical unit of work

那么这是否意味着可以向 Kafka 发送一些内容,如果出现问题,它将被回滚(从分区中删除消息?)
并且是否可以在事务中写入不同的主题?

apache-kafka kafka-transactions-api

8
推荐指数
1
解决办法
1万
查看次数

如何在 Kubernetes(仅生产者端事务)设置中选择 Kafka transactional.id

我有一个 Kafka 包装器库,它仅在生产端使用事务。图书馆不覆盖消费者。制作者发布到多个主题。目标是实现交易性。因此,生产应该要么成功,这意味着每个主题中应该只有一次写入的消息副本,要么失败,这意味着消息没有写入任何主题。该库的用户是在 Kubernetes Pod 上运行的应用程序。因此,Pod 可能会失败或频繁重新启动。此外,发送消息时不会显式设置分区。

我的问题是,我应该如何为生产者选择 transactional.id?我的第一个想法是在对象启动时简单地选择 UUID,并将 transaction.timeout.ms 设置为某个合理的时间(几秒钟)。这样,如果生产者由于 Pod 重新启动而终止,消费者不会永远锁定事务。

这个策略有什么缺陷吗?有没有更聪明的方法来做到这一点?另外,我不能向图书馆用户索要某种 ID。

apache-kafka kubernetes kafka-transactions-api

5
推荐指数
1
解决办法
704
查看次数

Kafka 事务性读已提交消费者

我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。

事务性 Kafka Producer 的配置

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud)

transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api

1
推荐指数
1
解决办法
1433
查看次数

我们可以在读取过程场景中应用 Kafka 的一次性语义吗?

我们如何确保 Kafka 在读过程场景中的精确一次语义。read 意味着我们正在从 Kafka 主题中读取数据并进行一些处理,然后我们尝试提交偏移量。假设我们处理了消息,但无法提交,并且在提交之前进程崩溃了。重新启动后,再次尝试使用相同的消息。那么如何处理这样的场景呢?这可以用 Kafka Transaction API 来处理吗?

有类似的问题,但无法正确理解它,也留下了很少的评论。只是想确认我的理解。 对 Kafka 的一次性语义感到困惑

apache-kafka kafka-transactions-api

0
推荐指数
1
解决办法
3443
查看次数