标签: spring-kafka

使用spring-kafka保证消息顺序的指数退避

我正在尝试实现一个基于Spring Boot的Kafka消费者,它具有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此.

  • 必须按顺序处理来自分区的消息,
  • 如果消息处理失败,应暂停特定分区的消耗,
  • 应该通过退避重试处理,直到成功为止.

我们当前的实施满足以下要求:

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());

    final ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
    containerProperties.setErrorHandler(errorHandler());

    return factory;
  }

  @Bean
  public RetryTemplate retryTemplate() {

    final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(1.5);

    final RetryTemplate template = new RetryTemplate();
    template.setRetryPolicy(new AlwaysRetryPolicy());    
    template.setBackOffPolicy(backOffPolicy);

    return template;
  }

  @Bean
  public ErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler();
  }
Run Code Online (Sandbox Code Playgroud)

但是,在这里,记录永远被消费者锁定.在某些时候,处理时间将超过max.poll.interval.ms,服务器将分区重新分配给其他消费者,从而创建一个副本.

假设max.poll.interval.ms等于5分钟(默认)并且故障持续30分钟,这将导致消息被处理ca. 6次.

另一种可能性是在N次重试(例如3次尝试)之后通过使用将消息返回到队列SimpleRetryPolicy.然后,将重播该消息(感谢SeekToCurrentErrorHandler)并且处理将从头开始,最多再次尝试5次.这导致延迟形成一系列例如

10 secs …
Run Code Online (Sandbox Code Playgroud)

messaging apache-kafka spring-retry spring-kafka exponential-backoff

7
推荐指数
1
解决办法
4565
查看次数

Spring Kafka该类不在受信任的软件包中

在库更新之前的我的Spring Boot / Kafka应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update将消息发布到Kafka主题。现在,我使用以下内容org.telegram.telegrambots.meta.api.objects.Update。如您所见-它们具有不同的软件包。

重新启动应用程序后,我遇到了以下问题:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

7
推荐指数
6
解决办法
7040
查看次数

当我运行测试时,Spring Embedded Kafka失败

我有一个弹簧嵌入式kafka问题,我想用它来测试我的kafka发送器/接收器。

当我尝试使用以下方式运行测试时:

@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
public class myTestClass {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka =
        new EmbeddedKafkaRule(1, true, RECEIVER_TOPIC);

    @Test public void test() {
        System.out.println("@Test");
    }
}
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

java.io.IOException: Failed to load C:\Users\username\AppData\Local\Temp\kafka-8251150311475880576 during broker startup
...
15:29:33.135 [main] ERROR kafka.log.LogManager - Shutdown broker because none of the specified log dirs from C:\Users\username\AppData\Local\Temp\kafka-8251150311475880576 can be created or validated
Run Code Online (Sandbox Code Playgroud)

我确定作为用户我可以访问此目录,并且在运行测试时可以看到kafka在temp文件夹上创建了此类目录(空),但仍然无法正常工作。

这是我的pom配置:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <version>2.2.0.RC1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.0.RC1</version>
        <scope>test</scope>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

appliacation.properties:

kafka.bootstrap-servers= ${spring.embedded.kafka.brokers}
Run Code Online (Sandbox Code Playgroud)

有趣的是,我从互联网上克隆了一个用于测试目的的kafka嵌入式示例,它工作正常,但是当我将其应用于我的项目时,它却像上面那样崩溃了。

有什么建议我在做什么错?

spring-kafka

7
推荐指数
0
解决办法
915
查看次数

如何将@Transactional与@KafkaListener一起使用?

是否有可能将声明式交易管理(通过@Transactional)与@KafkaListener注释方法一起使用?例如,我想使用它来为每个侦听器定义单独的 tx 超时。我的设置如下:

交易管理器:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}
Run Code Online (Sandbox Code Playgroud)

卡夫卡监听器:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
Run Code Online (Sandbox Code Playgroud)

问题是 - KafkaMessageListenerContainer 在调用此类方法之前创建自己的事务 - 它使用自己的 TransactionTemplate:

@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
    : null;
}

Run Code Online (Sandbox Code Playgroud)

未使用 TransactionInterceptor。那么如何为具体的 @KafkaListener 方法设置特定的 tx 超时呢?

java spring-transactions spring-boot spring-kafka

7
推荐指数
1
解决办法
5988
查看次数

在 Spring Boot 应用程序中使用 Confluence 中的架构注册表与 Avro 和 Kafka

首先,我必须说我对 confluence 并不熟悉。

我正在关注本教程:https://www.confluence.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/,但我陷入了困境。

我无法为 Kafka 创建使用者,因为我收到错误:io.confluence.common.config.ConfigException:缺少没有默认值的所需配置“schema.registry.url”。

我在 yml 配置中找不到此架构属性。

Confluence 在本地运行:

$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Run Code Online (Sandbox Code Playgroud)

在 Spring 中设置用户主题后,从控制中心我看到了不同的模式:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka confluent-schema-registry

7
推荐指数
1
解决办法
1万
查看次数

无法从 Kafka 中的消费者向死信主题发送消息

“我正在尝试将消息路由到 Kafka 中的死信主题,以防处理相应消息时出现任何失败。我已经为此功能设置了 SeektoCurrentErrorHandler 和 DeadLetterPublishingRecoverer。

消费者在执行此操作时抛出以下异常:

2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:

org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

7
推荐指数
1
解决办法
3628
查看次数

在 Spring Boot 应用程序中实现 Reactive Kafka Listener

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中尚不支持 Spring

我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但是我现在不确定如何在 Spring 中正确使用反应式 kafka。

基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka

7
推荐指数
1
解决办法
2036
查看次数

为 Kafka Consumer 随机生成组 ID

我们有一个使用 spring kafka 来读取消息的应用程序。应用程序的每个实例都必须有一个唯一的 groupId,并重置它并在重新启动时获取一个新的 groupId。GroupId 是通过 随机生成的${random.uuid}

随机生成id的解决方案真的正确吗?

java apache-kafka spring-boot kafka-consumer-api spring-kafka

7
推荐指数
1
解决办法
5192
查看次数

无法从 [java.lang.String] 转换为 [com.example.demo.User]

我正在研究Spring Boot 和 Apache Kafka - 尝试拥有用户定义的配置 -

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.Consumer.consume(com.example.demo.User) throws java.io.IOException]
Bean [com.example.demo.Consumer@7cd4a8cc]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

7
推荐指数
1
解决办法
1万
查看次数

嵌入式kafka和testcontainers kafka的区别?

我有简单的 spring boot kafka 监听器。我想测试一下。

使用嵌入式 kafka 进行测试 VS 使用测试容器 kafka 进行测试有何缺点?我熟悉测试容器,它提供了完全功能性的kafka。嵌入式kafka相对于容器有什么局限性?(请具体例子)

spring-boot spring-kafka

7
推荐指数
1
解决办法
3314
查看次数