标签: spring-kafka

spring kafka消费者中的无限重试@retryabletopic

我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。我给出的配置如下:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 300000, multiplier = 10.0),
            autoCreateTopics = "false",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
    )
Run Code Online (Sandbox Code Playgroud)

然而,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗?我希望该消息重试 4 次,第一次延迟 - 5 分钟后,然后第二次延迟 10 分钟后,第三次延迟 20 分钟后......

代码如下:

int i = 1;

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 300000, multiplier = 10.0),
        autoCreateTopics = "false",
        topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    String prachi = null; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-retry kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
1万
查看次数

RetryTemplate 与 @RetryableTopic

我试图理解 RetryTemplate 和 @RetryableTopic 之间的异同。

RetryTemplate 定义在 KafkaListenerContainerFactory 内部,@RetryableTopic 注解与 @KafkaListener 一起添加。这更多的是风格上的差异还是它们有两个不同的目的?

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
3839
查看次数

Springs Kafka Consumer最佳实践:消费者应该收到什么样的消息

我需要开始使用卡夫卡。我很难弄清楚消费者应该收到什么:根据我的理解,我们可以通过多种方式配置消费者:

示例1:

@KafkaListener(topics = "topic_name)
public void receiveSimpleString(@Payload String message) {
}
Run Code Online (Sandbox Code Playgroud)

示例2:

@KafkaListener(topics = "topic_name)
public void receiveConsumerRecord(@Payload ConsumerRecord<String, String> message) {
}
Run Code Online (Sandbox Code Playgroud)

示例3:

@KafkaListener(topics = "topic_name)
public void receiveObject(@Payload SomeCustomClass message) {
}
Run Code Online (Sandbox Code Playgroud)

示例4:

@KafkaListener(topics = "topic_name)
public void receiveSpringMessage(@Payload org.springframework.messaging.Message<T> message) {
}
Run Code Online (Sandbox Code Playgroud)

也许还有更多的方法,但那些曾经是我在研究 kafka+spring 时最常看到的。

现在的问题是:

是否有关于消费者应该收到什么的最佳实践?不同的例子有优点/缺点吗?

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
942
查看次数

卡夫卡| 恰好一次消费者多次消费一条消息

在我们的应用程序中,生产者和消费者都启用了一次。

Producer是一个python组件。我们已经启用:

  • 幂等性
  • 使用交易(每次发送消息时都会使用新的transactionId)

Consumer 是一个 Spring Boot 应用程序。我们已启用:

  • read_commited 隔离级别
  • 对消息使用手动确认

我们在 ConfluenceCloud 上有多分区 Kafka 主题(假设有 3 个分区)。

我们的应用程序设计如下:

  • 多个 Producer 应用程序实例
  • 为了性能,我们有很多消费者应用实例(目前大约 24 个)

在此输入图像描述

问题:

我们注意到,有时同一条 Kafka 消息会在 Consumer 中被多次消费。我们通过使用以下消费者代码检测到了这一点。我们将之前消费的kafka消息Id(带偏移量)保存在Redis中,并与新消费的消息进行比较。

消费者代码:

 @KafkaListener(topics = "${datalake.datasetevents.topic}",  groupId = "${spring.kafka.consumer.group-id}")
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                       @Header(KafkaHeaders.OFFSET) String offset,
                       @Payload InputEvent inputEvent, Acknowledgment acknowledgment)  {
        //KafkaHeaders.
        Event event = new Event();
        event.setCorrId(inputEvent.getCorrId());
        event.setQn(inputEvent.getQn());
        event.setCreatedTs(new Date());
        event.setEventTs(inputEvent.getEventTs());
        event.setMeta(inputEvent.getMeta() != null ? inputEvent.getMeta(): new HashMap<>());
        event.setType(inputEvent.getType());
        event.setUlid(key);

        //detect message duplications
        try {
            String eventRedisKey …
Run Code Online (Sandbox Code Playgroud)

python java apache-kafka spring-kafka

2
推荐指数
1
解决办法
1862
查看次数

从 Spring Kafka Consumer 禁用自动主题创建

当主题不存在时,我不想从我的消费者应用程序自动创建主题。

我知道这是一个 Kafka 服务器级别的配置,用于禁用自动主题创建 ( auto.create.topics.enable = false),但我无法在我的基础设施中进行此更改。

因此,我正在寻找一种方法来禁用我的消费者应用程序中的自动主题创建(使用 Spring Kafka)。

我尝试设置

spring:
  kafka:
    consumer:
      properties:
        allow.auto.create.topics: false
Run Code Online (Sandbox Code Playgroud)

但它不起作用!

似乎 Kafka 添加了此支持: https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation

有人可以帮忙吗?

java apache-kafka kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
6263
查看次数

Spring Kafka倾听所有主题并调整分区偏移

基于spring-kafka的文档,我使用基于注释的@KafkaListener来配置我的消费者.

我看到的是 -

  1. 除非我将偏移量指定为零,否则在开始时,Kafka消费者将收集未来的消息,而不是现有的消息.(我知道这是预期的结果,因为我没有指定我想要的偏移量)

  2. 我在文档中看到一个选项,用于指定主题+分区组合以及零偏移,但如果我这样做 - 我必须明确指定我希望我的消费者听哪个主题.

使用上面的方法2,这就是我的消费者现在的样子 -

@KafkaListener(id = "{group.id}",
        topicPartitions = {
                @TopicPartition(topic = "${kafka.topic.name}",
                        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
        },
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
                   Acknowledgment ack) throws InterruptedException, IOException {

    logger.debug("This is what we received in the Kafka Consumer = " + payload);

    idService.process(payload);

    ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)

虽然我知道有一个选项可以指定"topicPattern"通配符或"主题"列表作为注释配置的一部分,但我没有看到我可以提供偏移值从零开始的地方列出的主题/主题模式.有没有办法将两者结合起来?请指教.

spring-integration apache-kafka spring-kafka

1
推荐指数
1
解决办法
5435
查看次数

如何在多个消费者处读取相同的kafka消息

我是Kakfa Spring集成的新手.我已经实现了Kafka消息发送和One Listener,它对我来说很好.但是我希望在两个地方听到同样的信息.谁能帮我.以下是我的代码.

 spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.group-id=sample-group


spring.kafka.producer.batch-size= 16384
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.retries= 0
spring.kafka.producer.buffer-memory=33554432

spring.kafka.template.default-topic=spring-topic
Run Code Online (Sandbox Code Playgroud)

发件人代码:

public void testSimple() {
        System.out.println("Sending Topic Here");
        template.send("spring-topic", 0, "foo");
        template.flush();
    }
Run Code Online (Sandbox Code Playgroud)

接收器:

@KafkaListener(id = "foo", topics = "spring-topic")
    public void listen1(String foo) {
        System.out.println("Got Notification Here");
        this.latch1.countDown();
    }
Run Code Online (Sandbox Code Playgroud)

可以帮助我如何在不同的地方阅读相同的消息.

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

1
推荐指数
1
解决办法
1087
查看次数

Spring Kafka-试图了解幕后的工作原理

考虑以下代码 -

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress);
    props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      groupId);
    props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我创建了一个消费者工厂和一个concurrentKafkaListenercontainer Factory.我没有为监听器Factory设置并发性.我有一个用@KafkaListener注释的方法

@KafkaListener(topics = "topicName")
public void listen(String message) {
    System.out.println("Received Message: " + message);
Run Code Online (Sandbox Code Playgroud)

当我不设置并发属性时,Spring会创建1个消费者实例,1个kafka监听器容器属于消费者工厂中指定的组吗?

如果我将并发性更改为3,那么Spring会创建3个消费者实例,因此在配置消费者工厂和3个侦听器容器时,同一个消费者组中有3个消费者吗?

此外,根据并发性,我们假设我们现在只听一个主题,我们将有3个用@kafkalistener注释的方法,如果没有指定分区,则所有3个方法都会监听不同的分区(由kafka以循环方式提供).?

我是卡夫卡的新手,想澄清我的理解.

java spring spring-kafka

1
推荐指数
1
解决办法
992
查看次数

Kafka-在Consumer中反序列化对象

我们正在考虑在我们的消息传递中使用Kafka,并且我们的应用程序是使用Spring开发的。因此,我们计划使用spring-kafka。

生产者将消息作为HashMap对象放入队列。我们有JSON序列化程序,并且我们假设地图将被序列化并放入队列中。这是生产者配置。

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
        key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

另一方面,我们有一个侦听器,用于侦听生产者发布消息的相同主题。这是使用者配置:

spring:
   kafka:
       consumer:
            group-id: xyz
            key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Run Code Online (Sandbox Code Playgroud)

我们的监听器方法:

  public void listener(SomeClass abx)
Run Code Online (Sandbox Code Playgroud)

我们预期json将被反序列化,并且将生成“ SomeClass”类型的对象。但是显然,它引发了反序列化异常。

我们看到的文章很少,建议这样做是:

 @Bean
  public ConsumerFactory<String, Car> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Car.class));
  }
Run Code Online (Sandbox Code Playgroud)

我们不想编写一些代码来创建反序列化器。有没有我们所缺少的样板东西?任何帮助将不胜感激!!

java apache-kafka spring-kafka

1
推荐指数
1
解决办法
5338
查看次数

如何为@KafkaListener编写单元测试?

试图弄清楚我是否可以使用spring-kafka和spring-kafka-test为@KafkaListener编写单元测试。

我的侦听器类。

public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;

@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
    myMessageProcessor.process(message);
    log.info("MyMessage processed");
}}
Run Code Online (Sandbox Code Playgroud)

我的测试班:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {

@Autowired
private MyMessageProcessor myMessageProcessor;

@Value("${kafka.topic.01}")
private String TOPIC_01;

@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;

@Test
public void testSalesforceMessageListner() {
    MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
    messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
    verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
Run Code Online (Sandbox Code Playgroud)

我的测试配置类:

    @Configuration
    @EnableKafka …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

1
推荐指数
3
解决办法
3040
查看次数