我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)
我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?
因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-
“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”
他们还指定了接口-
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value,
RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value,
Exception exception);
boolean isInterestedInSuccess();
}
Run Code Online (Sandbox Code Playgroud)
因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。
在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。
future.addCallback(new ListenableFutureCallback<SendResult<Integer,
String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
Run Code Online (Sandbox Code Playgroud)
所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。
先感谢您。
我刚刚开始在 Spring Boot 中使用 Kafka 并想发送和使用 JSON 对象。
当我尝试使用来自 Kafka 主题的消息时,出现以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) …Run Code Online (Sandbox Code Playgroud) 在库更新之前的我的Spring Boot / Kafka应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update将消息发布到Kafka主题。现在,我使用以下内容org.telegram.telegrambots.meta.api.objects.Update。如您所见-它们具有不同的软件包。
重新启动应用程序后,我遇到了以下问题:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all …Run Code Online (Sandbox Code Playgroud) 如何使用 Spring-Kafka 通过 Confluent Schema 注册表读取 AVRO 消息?有样品吗?我在官方参考文件中找不到它。
avro spring-kafka confluent-schema-registry confluent-platform
我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。
重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException。
然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。
如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。
关于如何实现此功能的任何想法?
注:enable.auto.commit设为false,auto.offset.reset设为earliest。
谢谢!
我有一个 Spring Boot 应用程序,它定义了:
(我已经改变了后缀,以避免任何可能的混乱,手工创建的主题,否则我得到一个警告,STREAM_TOPIC_IN_ XXX = LEADER_NOT_AVAILABLE和流不会为一分钟左右运行。)
第一个侦听器和流似乎正在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,我收到以下异常。我正在使用 Produced.with 提供流中的 serde。我需要做什么才能让侦听器知道要反序列化的类型?
日志
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition …Run Code Online (Sandbox Code Playgroud) 我的spring-boot application(consumer)进程消息来自Apache Kafka. 周期性地,按摩无法处理并且消费者抛出异常。无论如何,消费者提交抵消。我可以区分 Kafka 中的成功消息和失败消息吗?我想,我不能。这是真的吗?如果是真的,我有一个主要问题:
如何重试失败消息?我知道一些方法,但我不确定它们的正确性。
1) 将偏移量更改为早期。但通过这种方式,成功消息也会重试。
2)当我捕获异常时,我将此消息发送到另一个主题(例如错误主题)。但看起来很难。
3)别的东西(你的变体)
我正在开发一个 Kafka-Stream 应用程序,它将从输入 Kafka 主题中读取消息并过滤不需要的数据并推送到输出 Kafka 主题。
卡夫卡流配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
Run Code Online (Sandbox Code Playgroud)
KStream过滤逻辑:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " …Run Code Online (Sandbox Code Playgroud) 我使用 kafka_2.11-2.1.1 和 Producer 使用 spring 2.1.0.RELEASE。
我在向 Kafka 主题发送消息时正在使用 spring,我的生产者生成了很多 TimeoutExceptions
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)
我正在使用以下 kafka 生产者设置
acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
Run Code Online (Sandbox Code Playgroud)
我尝试了很多组合(特别是batchSize& lingerMs)但没有任何效果。任何帮助请问上述场景的设置应该是什么。
使用以下配置再次尝试......但没有运气同样的错误
acks = 1
batch.size = 15
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms …Run Code Online (Sandbox Code Playgroud)