我在Spring Kafka设置中使用Avro和Schema注册表.
我想以某种方式处理SerializationException,反序列化期间可能会抛出.
我找到了以下两个资源:
https://github.com/spring-projects/spring-kafka/issues/164
这些资源表明我返回null而不是SerializationException在反序列化和监听时抛出KafkaNull.这个解决方案很好用.
但是,我希望能够抛出异常而不是返回null.
KIP-161和KIP-210为处理异常提供了更好的功能.我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka.
有谁知道如何捕获SerializationExceptionSpring Boot?
我使用的是Spring Boot 2.0.2
编辑:我找到了解决方案.
我宁愿抛出异常并抓住它而不是返回null或KafkaNull.我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring.如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null.
我想关闭容器,这样我就不会丢失任何消息.在生产中永远不应该期望SerializationException.只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException.无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题.
只需考虑将从您的消费者容器中捕获所有异常.在我的具体情况下,我只想关闭它,如果它是一个SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap …Run Code Online (Sandbox Code Playgroud) 我已经对这个问题进行了很多研究,但我没有找到任何提及类似内容的人.
我正在创建一个spring boot应用程序并GetMapping用于映射我的资源.出于某种原因,当我使用UUID作为我的时候,我遇到了奇怪的错误PathVariable.
Controller.java
@GetMapping("/test/{id1}")
public String testMethod(@PathVariable UUID id1){
return "test";
}
Request:
/test/b74adb59-af60-423e-a3dd-c2f356114b51
Run Code Online (Sandbox Code Playgroud)
test.html - 使用不存在的样式表.
<html>
<head>
<link rel="stylesheet" href="bullshit">
</head>
<body>
<h1>This is a test page</h1>
</body>
</html>
Run Code Online (Sandbox Code Playgroud)
这将导致以下错误:
org.springframework.web.method.annotation.MethodArgumentTypeMismatchException: Failed to convert value of type 'java.lang.String' to required type 'java.util.UUID'; nested exception is java.lang.IllegalArgumentException: Invalid UUID string: bullshit
2018-02-22 03:49:23.062 WARN 252700 --- [nio-8080-exec-5] .m.m.a.ExceptionHandlerExceptionResolver : Resolved exception caused by Handler execution: org.springframework.web.method.annotation.MethodArgumentTypeMismatchException: Failed to convert value of type 'java.lang.String' to …Run Code Online (Sandbox Code Playgroud) 我正在寻找 Spring 框架不同部分的兼容性矩阵。
更具体地说,我正在寻找与 Spring-boot 1.5.2 兼容的最新 Spring-Kafka 版本。
我发现了一个旧的 Spring 兼容性矩阵,但该矩阵是 2014 年的,因此已弃用。
我不关心 Spring-Kafka 和 Apache Kafka 客户端兼容性,也不关心 Apache Kafka java 客户端和 Kafka 代理兼容性。此兼容性矩阵可在此处获取:
https://spring.io/projects/spring-kafka https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
我正试图围绕卡夫卡交易和恰好一次。
我创建了一个事务消费者,我想确保我阅读和处理一个主题的所有消息。如果事务失败并因此丢失消息,Kafka 仍会提交偏移量。
更正式地说,如果流处理应用程序消费消息 A 并生成消息 B,使得 B = F(A),那么恰好一次处理意味着当且仅当 B 成功生成时才认为 A 被消费,反之亦然。 来源
基于此,我假设消息 A 未被消耗,因此将再次重新处理。但是如何重新处理这条消息呢?
我发现很多消息来源表明,如果处理失败,则不会使用该消息。但是我找不到任何提到如果消息未被消耗如何重新处理消息的来源。我似乎认为,如果事务消费者失败,那么 Kafka 将回滚但继续提交偏移量,以便它可以处理下一条消息。但是如果Kafka提交了偏移量,那么之前的消息就丢失了?
我发现一些 Spring Kafka 页面描述了如何处理消费者中的异常。所以你基本上要么停止容器,要么停止整个应用程序。我认为 Kafka 有一些内部机制来处理这种行为,因为文档指出只有成功生成消息 B 才会消耗消息 A。但是如果消息 A 没有被消费,那么 Kafka 仍然会提交偏移量并继续下一条消息。
感觉就像恰好一次只适用于永远不会发生错误的情况。老实说,如果 Kafka 无论如何都会提交偏移量,我并不关心消息是被消费还是不被消费。即使消息没有被消费,消息也会丢失,所以似乎我必须停止容器或应用程序以确保我不会丢失任何消息。
https://spring.io/blog/2017/12/01/spring-for-apache-kafka-2-1-0-release-and-1-3-2-2-0-2-available https:/ /docs.spring.io/spring-kafka/reference/html/_reference.html#annotation-error-handling https://github.com/spring-cloud/spring-cloud-stream/issues/1114
编辑 1:我在下面添加了一些示例代码。
我有一个非常简单的 Spring Boot 应用程序。
@SpringBootApplication
@EnableTransactionManagement
public class KafkaTransactionMysteryApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTransactionMysteryApplication.class, args);
}
@Bean
@Primary
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Bean
public ChainedKafkaTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) …Run Code Online (Sandbox Code Playgroud) 我想创建一个并发 @KafkaListener,它可以处理多个主题,每个主题都有不同数量的分区。
我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。
示例:我将并发设置为 8。我@KafkaListener听了以下主题。主题 A 的分区最多 - 5 个,因此 Spring-Kafka 初始化了 5 个消费者。我希望 Spring-Kafka 初始化 8 个消费者,这是根据我的并发属性允许的最大值。
不初始化更多消费者的技术原因是什么?
我如何绕过这个,以便我可以使用@KafkaListener注释初始化更多的使用者?(如果可能的话)
我将 Spring Boot 2 与 spring-boot-starter-data-jpa 与底层 MariaDB 一起使用。
我有一个带有唯一键“用户名”的表。我想DataIntegrityViolationException知道是否违反了这个约束,但似乎 Spring 正在记录DataIntegrityViolationException并且不会重新抛出 after logging(我最好的猜测)。MySQLIntegrityConstraintViolationException而是抛出。
我想赶DataIntegrityViolationException在UserService.createUser(..)。
下面是几个代码片段:
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class UserRepository {
@PersistenceContext
private EntityManager entityManager;
public void save(User user) {
entityManager.persist(user);
}
}
@Service
@Transactional(value = Transactional.TxType.REQUIRED)
public class UserService {
@Autowired
private UserRepository userRepository;
private void createUser(User user){
userRepository.save(user);
}
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪:
2018-09-22 14:20:33.163 WARN 10700 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 1062, SQLState: 23000
2018-09-22 14:20:33.163 …Run Code Online (Sandbox Code Playgroud) spring ×4
spring-kafka ×4
apache-kafka ×2
java ×2
spring-boot ×2
css ×1
hibernate ×1
html ×1
jpa ×1
thymeleaf ×1
transactions ×1