小编Joh*_*hnD的帖子

如何在@KafkaListeners中将groupId设置为null

与:这个问题相关

我正在尝试通过 @KafkaListener 阅读压缩主题。我希望每个消费者每次都能阅读整个主题。

我无法为每个消费者生成唯一的 groupId。所以我想使用一个空的groupid。

我尝试配置容器和消费者以将 groupId 设置为 null,但都不起作用。

这是我的容器配置:

 ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // Set ackMode to manual and never commit, we are reading from the beginning each time
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckOnError(false);
        // Remove groupId, we are consuming all partitions here
        factory.getContainerProperties().setGroupId(null);
        // Enable idle event in order to detect when init phase is over
        factory.getContainerProperties().setIdleEventInterval(1000L);
Run Code Online (Sandbox Code Playgroud)

还尝试强制消费者配置:

Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
        // Override group id property to force "null"
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
        ConsumerFactory<Object, Object> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

9
推荐指数
1
解决办法
2万
查看次数

Spring Kafka 和事务

我想将 Spring Kafka 与事务一起使用,但我不太明白它应该如何配置以及它如何工作。

这是我的配置

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ACKS_CONFIG, "all");
Run Code Online (Sandbox Code Playgroud)

此配置用于带有事务 id 前缀的 DefaultKafkaProducerFactory 中:

defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
Run Code Online (Sandbox Code Playgroud)

问题1:

我该如何选择这个交易 ID 前缀?如果我理解正确的话,spring 使用这个前缀为每个创建的生产者生成一个事务 ID。

为什么我们不能只使用“UUID.randomUUID()”?

问题2:

如果生产者被销毁,它将生成一个新的事务ID。因此,如果应用程序崩溃,重新启动时它将重用旧的事务 ID。

这正常吗???

问题3:

我正在使用部署在云上的应用程序,该应用程序可以自动放大/缩小。这意味着我的前缀无法修复,因为每个实例上的所有生产者都将具有冲突的事务 ID。

我应该在其中添加随机部分吗?当实例缩小/放大或崩溃并重新启动时,我是否需要恢复相同的前缀?

问题4:

最后但并非最不重要的一点是,我们正在使用 Kafka 的凭据。这似乎不起作用:

Current ACLs for resource `TransactionalId:my_app*`:
    User:CN... has Allow permission for operations: All from hosts: *
Run Code Online (Sandbox Code Playgroud)

知道我的事务 ID 已生成,我应该如何设置 ACL?

编辑1

进一步阅读后,如果我理解正确的话。

如果您有从 P0(分区)读取 C0(消费者)。如果代理启动消费者重新平衡。P0 可以分配给另一个消费者 C1。这个消费者 C1 应该使用与之前的 C0 相同的事务 ID 以防止重复(僵尸围栏)?

你如何在 spring-kafka 中实现这一点?事务 ID 似乎与消费者无关,因此与分区读取无关。

谢谢

spring spring-transactions apache-kafka spring-kafka

4
推荐指数
1
解决办法
5470
查看次数

Kafka如何将生产者重试设置为无限

如何将 spring-boot 属性: spring.kafka. Producer.retries 设置为 Integer.MAX_VALUE ?

取消设置此属性是否有效,或者默认为 0 ?

@查看KIP中的默认kafka https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

apache-kafka spring-boot spring-kafka

4
推荐指数
1
解决办法
9285
查看次数