标签: spring-kafka

Spring Kafka 事务导致生产者每条消息的偏移量增加了 2

我在使用 Spring(boot) Kafka 的微服务中有一个消费-转换-生产工作流程。我需要实现 Kafka 事务提供的一次性语义。下面是代码片段:

配置

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
    DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
    defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
    return defaultKafkaProducerFactory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory()); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka

3
推荐指数
1
解决办法
2569
查看次数

如何使用 Spring Boot 等待完整的 Kafka 消息批处理?

当批量消费 Kafka 消息时,可以使用 限制批量大小max.poll.records

如果消费者非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将会小得多。我只想接收“完整”批次,即只有在达到批次大小后才调用我的消费者函数。所以我正在寻找类似 的东西min.poll.records,它不以这种形式存在。

这是我正在做的一个最小的例子:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    ) …
Run Code Online (Sandbox Code Playgroud)

spring kotlin apache-kafka spring-boot spring-kafka

3
推荐指数
1
解决办法
3929
查看次数

结合 spring-kafka 和reactor-kafka 时出现意外的循环依赖

以下代码在receiverOptions和模板之间产生意外的循环依赖:

令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。

看起来某些自动配置正在添加从模板到接收器选项的不必要的依赖项。

请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Map<String, Object> kafkaProps() {
        return Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
            ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
        );
    }

    @Bean
    public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
        return ReceiverOptions.<String, String>create(kafkaProps)
            .withKeyDeserializer(new StringDeserializer())
            .withValueDeserializer(new StringDeserializer())
            .subscription(List.of("test-topic"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @Bean
    public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
        return args -> kafkaReceiver.receive()
            .log()
            .subscribe();
    } …
Run Code Online (Sandbox Code Playgroud)

circular-reference spring-boot project-reactor spring-kafka

3
推荐指数
1
解决办法
1227
查看次数

如何在 Spring Kafka Stream 中配置 UncaughtExceptionHandler

我正在spring-kafka使用 Spring Boot 1.5.16 来实现流应用程序。我们使用的版本spring-kafka是1.3.8.RELEASE。

我正在寻找一种方法来关闭启动应用程序,以防出现终止与 Kafka Streams 关联的所有线程的错误。我发现在内部KafkaStreams可以注册未捕获异常的句柄。方法是setGlobalStateRestoreListener

我看到这个方法是spring-kafka在 type内部公开的KStreamBuilderFactoryBean

我的问题如下。有没有一种简单的方法将 注册UncaughtExceptionHandler为 bean 并让 Spring 正确注入到工厂 bean 中?或者我应该创建KStreamBuilderFactoryBean自己的并手动设置处理程序?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}
Run Code Online (Sandbox Code Playgroud)

多谢。

java spring-batch apache-kafka-streams spring-kafka

3
推荐指数
1
解决办法
3043
查看次数

Spring Kafka,覆盖 max.poll.interval.ms?

我需要将max.poll.interval.ms默认值 300000 增加到更大的值,因为。超时异常。

但是我无法在 application.properties 中找到属性(自动完成)来覆盖它。我错过了什么吗?或者我只是使用旧版本的 Spring Kafka (2.1.10)

max.poll.interval.ms = 300000
max.poll.records     = 500
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka

3
推荐指数
1
解决办法
2万
查看次数

spring-kafka KafkaListener 中的并行处理和自动缩放

我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,该主题发送的消息格式如下所示。

    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }
Run Code Online (Sandbox Code Playgroud)
  • KafkaListener能否根据自己监听的主题数量来分配消费者线程数量,并并行处理两个主题中的消息?或者它不支持并行处理,消息必须在主题中等待,直到一条消息得到处理?
  • 如果主题中的消息数量较多,我需要自动缩放微服务以启动新实例(直到分区数量)。从 KafkaListener 的角度来看,我可以依靠哪些参数(CPU、内存)来找出主题中的消息数量更高?(即在 API 中,我可以通过监控 HTTP 延迟来自动扩展服务)

autoscaling apache-kafka spring-boot kafka-consumer-api spring-kafka

3
推荐指数
2
解决办法
7338
查看次数

Spring Kafka 与 Avro 反序列化器

我正在尝试将 Spring Kafka 与 Confluence 模式注册表和 Kafka Avro 反序列化器一起使用。使用 gradle 和 .avsc 我生成了 avro 类。使用生成的类,我发送通用记录并使用相同的记录。我在 kafka 监听器中收到以下错误:

Error while processing: ConsumerRecord(topic = topi_name, partition = 2, offset = 149, CreateTime = 1592288763784, serialized key size = 16, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = event_test, value = {"eventType": "test", "EventDataRequest": {"user": "54321", "panId": "1234", "empId": "5"}})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void className(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>) throws com.test.kafka.exception.ConsumerException,org.apache.xmlbeans.XmlException,java.io.IOException,java.lang.ClassNotFoundException' threw exception; nested exception is java.lang.ClassCastException: com.test.MyPojo …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

3
推荐指数
1
解决办法
1万
查看次数

@KafkaListener 启动问题(Spring)

需要什么

我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明@KafkaListener时设置autoStartup = "true"那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。

我尝试做什么

为了避免启动时应用程序崩溃,此站点上的另一个主题建议在声明@KafkaListener时设置autoStartup = "false"。它确实有助于防止启动时崩溃。但现在我无法成功手动启动KafkaListener。在其他示例中,我看到了KafkaListenerEndpointRegistry的自动连接,但是当我尝试这样做时:

@Service
class KafkaConsumer @Autowired constructor(
        private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
Run Code Online (Sandbox Code Playgroud)

IntelliJ Idea 警告:

无法自动装配。未找到“KafkaListenerEndpointRegistry”类型的 bean。

当我尝试在不自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行以下代码时:

@Service
class KafkaConsumer {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

    @Scheduled(fixedDelay = 10000)
    fun startCpguListener(){
        val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
        if (!container.isRunning)
            try {
                logger.info("Kafka Consumer is not running. Trying to start...")
                container.start()
            } catch (e: Exception){ …
Run Code Online (Sandbox Code Playgroud)

spring kotlin spring-kafka

3
推荐指数
1
解决办法
4419
查看次数

如何在Spring-kafka中实现ConsumerSeekAware?

我正在尝试使用 @KafkaListener 实现消费者。我使用的是Spring2.3.7版本。

这是到目前为止我的代码,

public class SampleListener {

@KafkaListener(topics = "test-topic",
        containerFactory = "sampleKafkaListenerContainerFactory",
        groupId = "test-group")
public void onMessage(@Payload String message,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
                              @Header(KafkaHeaders.OFFSET) long offset,
                              @Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
            topic, partition, offset, messageHeaders);
    LOGGER.debug("Received Message payload={}", message);
    doSomething(message);

   }
}
Run Code Online (Sandbox Code Playgroud)

我是卡夫卡和春天的新手。我阅读了有关如何寻求偏移量的 spring-kafka 文档,但无法完全理解。

据我了解,对于我的用例,当分区分配给容器或在任何其他场景中时,我不想再次读取事件(确保只读取一次)。

我看到大多数 Consumer 实现都实现了ConsumerSeekAware。我知道实施ConsumerSeekAware使我们能够寻求对诸如onIdleContainer或 之类的事件进行抵消onPartitionsAssigned。我无法理解这些正在处理什么场景?

  1. ConsumerSeekAware实施来处理哪些场景?实现 Kafka Consumer 需要寻求抵消的最佳实践或一般场景是什么?

  2. registerSeekCallback …

spring kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
7144
查看次数

@KafkaListener 与 ConsumerFactory groupId

我遵循了 baeldung.com 的“ Apache Kafka with Spring 简介”教程。我用以下方法设置了一个KafkaConsumerConfigkafkaConsumerFactory

private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}
Run Code Online (Sandbox Code Playgroud)

和两个“定制”工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("bar");
}
Run Code Online (Sandbox Code Playgroud)

MessageListener课堂上,我使用@KafkaListener注释来注册消费者,以groupId监听某个主题:

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
2701
查看次数