use*_*594 0 java spring-boot kafka-consumer-api spring-kafka
我有一个使用 spring-kafka 配置的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则会重试 2 次,之后该消息应记录到错误文件中。我有两种可以遵循的方法:-
第一种方法(使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
Run Code Online (Sandbox Code Playgroud)
方法 2(使用自定义 SeekToCurrentErrorHandler):-
@Bean
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = 2;
CustomSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
}
}
Run Code Online (Sandbox Code Playgroud)
}
任何人都可以就实现此类功能的标准方法提供建议。在第一种方法中,我们确实看到创建 .DLT 主题和额外的 @KafkaListener 的开销。在第二种方法中,我们可以直接记录我们的消费者记录异常。
使用第一种方法,不需要使用 a DeadLetterPublishingRecoverer
,您可以使用任何ConsumerRecordRecoverer
您想要的;事实上,默认恢复程序只是记录失败的消息。
/**
* Construct an instance with the default recoverer which simply logs the record after
* the backOff returns STOP for a topic/partition/offset.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(BackOff backOff) {
this(null, backOff);
}
Run Code Online (Sandbox Code Playgroud)
并且,在FailedRecordTracker
...
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
...
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
Run Code Online (Sandbox Code Playgroud)
在侦听器适配器中添加重试后,退避(以及重试限制)被添加到错误处理程序中,因此它是“较新的”(也是首选)。
此外,如果使用 long ,则使用内存中重试可能会导致重新平衡问题BackOff
。
最后,只有SeekToCurrentErrorHandler
可以处理反序列化问题(通过ErrorHandlingDeserializer
)。
编辑
ErrorHandlingDeserializer
将与 一起使用SeekToCurrentErrorHandler
。反序列化异常被认为是致命的,并且立即调用恢复器。
请参阅文档。
这是一个简单的 Spring Boot 应用程序来演示它:
public class So63236346Application {
private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
public static void main(String[] args) {
SpringApplication.run(So63236346Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
+ ex.getMessage()));
}
@KafkaListener(id = "so63236346", topics = "so63236346")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63236346", "{\"field\":\"value1\"}");
template.send("so63236346", "junk");
template.send("so63236346", "{\"field\":\"value2\"}");
};
}
}
Run Code Online (Sandbox Code Playgroud)
package com.example.demo;
public class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
Run Code Online (Sandbox Code Playgroud)
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
Run Code Online (Sandbox Code Playgroud)
结果
Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782 INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
24109 次 |
最近记录: |