标签: spring-kafka

Spring Kafka - 如何使用组ID将偏移重置为最新?

我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)

我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?

java spring spring-integration apache-kafka spring-kafka

7
推荐指数
2
解决办法
7998
查看次数

Spring Kafka-使用Producer Listener配置KafkaTemplate与使用Listenable Future注册回调之间的区别

因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-

“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”

他们还指定了接口-

     public interface ProducerListener<K, V> {

      void onSuccess(String topic, Integer partition, K key, V value, 
      RecordMetadata recordMetadata);

      void onError(String topic, Integer partition, K key, V value, 
      Exception exception);

      boolean isInterestedInSuccess();

  }
Run Code Online (Sandbox Code Playgroud)

因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。

在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。

 future.addCallback(new ListenableFutureCallback<SendResult<Integer, 
   String>>() {

 @Override
 public void onSuccess(SendResult<Integer, String> result) {
    ...
  }

  @Override
  public void onFailure(Throwable ex) {
    ...
  }

  });
Run Code Online (Sandbox Code Playgroud)

所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。

先感谢您。

java spring spring-kafka

7
推荐指数
1
解决办法
2071
查看次数

Spring Boot / Kafka Json 反序列化 - 可信包

我刚刚开始在 Spring Boot 中使用 Kafka 并想发送和使用 JSON 对象。

当我尝试使用来自 Kafka 主题的消息时,出现以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot spring-kafka

7
推荐指数
1
解决办法
1万
查看次数

Spring Kafka该类不在受信任的软件包中

在库更新之前的我的Spring Boot / Kafka应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update将消息发布到Kafka主题。现在,我使用以下内容org.telegram.telegrambots.meta.api.objects.Update。如您所见-它们具有不同的软件包。

重新启动应用程序后,我遇到了以下问题:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

7
推荐指数
6
解决办法
7040
查看次数

如何使用 Spring-Kafka 通过 Confluent Schema 注册表读取 AVRO 消息?

如何使用 Spring-Kafka 通过 Confluent Schema 注册表读取 AVRO 消息?有样品吗?我在官方参考文件中找不到它。

avro spring-kafka confluent-schema-registry confluent-platform

7
推荐指数
2
解决办法
1万
查看次数

Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}
Run Code Online (Sandbox Code Playgroud)

ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。

重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException

然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。

如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。

关于如何实现此功能的任何想法?

注:enable.auto.commit设为falseauto.offset.reset设为earliest

谢谢!

apache-kafka spring-retry spring-kafka

7
推荐指数
1
解决办法
9247
查看次数

spring kafka 标题中没有类型信息,也没有提供默认类型

我有一个 Spring Boot 应用程序,它定义了:

  • 写入 kafka 主题 STREAM_TOPIC_IN_QQQ 的 REST 控制器
  • 从 STREAM_TOPIC_IN_QQQ (groupId="bar") 和日志读取的 KafkaListener
  • 一个查看主题并记录它的 KStream,将其转换为另一种类型,然后将其写入 STREAM_TOPIC_OUT_QQQ
  • 另一个从 STREAM_TOPIC_OUT_QQQ 读取的 KafkaListener。

(我已经改变了后缀,以避免任何可能的混乱,手工创建的主题,否则我得到一个警告,STREAM_TOPIC_IN_ XXX = LEADER_NOT_AVAILABLE和流不会为一分钟左右运行。)

第一个侦听器和流似乎正在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,我收到以下异常。我正在使用 Produced.with 提供流中的 serde。我需要做什么才能让侦听器知道要反序列化的类型?

日志

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

7
推荐指数
4
解决办法
3万
查看次数

如何重试来自 kafka 的失败消息?

我的spring-boot application(consumer)进程消息来自Apache Kafka. 周期性地,按摩无法处理并且消费者抛出异常。无论如何,消费者提交抵消。我可以区分 Kafka 中的成功消息和失败消息吗?我想,我不能。这是真的吗?如果是真的,我有一个主要问题:

如何重试失败消息?我知道一些方法,但我不确定它们的正确性。

1) 将偏移量更改为早期。但通过这种方式,成功消息也会重试。

2)当我捕获异常时,我将此消息发送到另一个主题(例如错误主题)。但看起来很难。

3)别的东西(你的变体)

apache-kafka spring-boot spring-kafka

7
推荐指数
2
解决办法
8523
查看次数

Kafka 流异常:GroupAuthorizationException

我正在开发一个 Kafka-Stream 应用程序,它将从输入 Kafka 主题中读取消息并过滤不需要的数据并推送到输出 Kafka 主题。

卡夫卡流配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}
Run Code Online (Sandbox Code Playgroud)

KStream过滤逻辑:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams spring-kafka

7
推荐指数
1
解决办法
1万
查看次数

如何修复 kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms 自批处理创建以来已经过去了加上逗留时间

我使用 kafka_2.11-2.1.1 和 Producer 使用 spring 2.1.0.RELEASE。

我在向 Kafka 主题发送消息时正在使用 spring,我的生产者生成了很多 TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)

我正在使用以下 kafka 生产者设置

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
Run Code Online (Sandbox Code Playgroud)

我尝试了很多组合(特别是batchSize& lingerMs)但没有任何效果。任何帮助请问上述场景的设置应该是什么。

使用以下配置再次尝试......但没有运气同样的错误

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api spring-kafka

7
推荐指数
1
解决办法
1万
查看次数