小编kkf*_*flf的帖子

反序列化后如何处理SerializationException

我在Spring Kafka设置中使用Avro和Schema注册表.

我想以某种方式处理SerializationException,反序列化期间可能会抛出.

我找到了以下两个资源:

https://github.com/spring-projects/spring-kafka/issues/164

如何配置spring-kafka忽略错误格式的消息?

这些资源表明我返回null而不是SerializationException在反序列化和监听时抛出KafkaNull.这个解决方案很好用.

但是,我希望能够抛出异常而不是返回null.

KIP-161KIP-210为处理异常提供了更好的功能.我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka.

有谁知道如何捕获SerializationExceptionSpring Boot?

我使用的是Spring Boot 2.0.2

编辑:我找到了解决方案.

我宁愿抛出异常并抓住它而不是返回null或KafkaNull.我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring.如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null.

我想关闭容器,这样我就不会丢失任何消息.在生产中永远不应该期望SerializationException.只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException.无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题.

只需考虑将从您的消费者容器中捕获所有异常.在我的具体情况下,我只想关闭它,如果它是一个SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap …
Run Code Online (Sandbox Code Playgroud)

error-handling serialization deserialization spring-kafka

9
推荐指数
1
解决办法
3077
查看次数

站点资源在GetMapping中泄漏到UUID中

我已经对这个问题进行了很多研究,但我没有找到任何提及类似内容的人.

我正在创建一个spring boot应用程序并GetMapping用于映射我的资源.出于某种原因,当我使用UUID作为我的时候,我遇到了奇怪的错误PathVariable.

Controller.java

@GetMapping("/test/{id1}")
public String testMethod(@PathVariable UUID id1){
    return "test";
}    

Request:
/test/b74adb59-af60-423e-a3dd-c2f356114b51
Run Code Online (Sandbox Code Playgroud)

test.html - 使用不存在的样式表.

<html>
    <head>
        <link rel="stylesheet" href="bullshit">
    </head>
    <body>
        <h1>This is a test page</h1>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)

这将导致以下错误:

org.springframework.web.method.annotation.MethodArgumentTypeMismatchException: Failed to convert value of type 'java.lang.String' to required type 'java.util.UUID'; nested exception is java.lang.IllegalArgumentException: Invalid UUID string: bullshit
2018-02-22 03:49:23.062  WARN 252700 --- [nio-8080-exec-5] .m.m.a.ExceptionHandlerExceptionResolver : Resolved exception caused by Handler execution: org.springframework.web.method.annotation.MethodArgumentTypeMismatchException: Failed to convert value of type 'java.lang.String' to …
Run Code Online (Sandbox Code Playgroud)

html css java spring thymeleaf

8
推荐指数
1
解决办法
416
查看次数

Spring-boot 和 Spring-Kafka 兼容性矩阵

我正在寻找 Spring 框架不同部分的兼容性矩阵。

更具体地说,我正在寻找与 Spring-boot 1.5.2 兼容的最新 Spring-Kafka 版本。

我发现了一个旧的 Spring 兼容性矩阵,但该矩阵是 2014 年的,因此已弃用。

我不关心 Spring-Kafka 和 Apache Kafka 客户端兼容性,也不关心 Apache Kafka java 客户端和 Kafka 代理兼容性。此兼容性矩阵可在此处获取:

https://spring.io/projects/spring-kafka https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

spring spring-boot spring-kafka

7
推荐指数
1
解决办法
2万
查看次数

卡夫卡交易失败,但无论如何提交抵消

我正试图围绕卡夫卡交易和恰好一次。

我创建了一个事务消费者,我想确保我阅读和处理一个主题的所有消息。如果事务失败并因此丢失消息,Kafka 仍会提交偏移量。

更正式地说,如果流处理应用程序消费消息 A 并生成消息 B,使得 B = F(A),那么恰好一次处理意味着当且仅当 B 成功生成时才认为 A 被消费,反之亦然。 来源

基于此,我假设消息 A 未被消耗,因此将再次重新处理。但是如何重新处理这条消息呢?

我发现很多消息来源表明,如果处理失败,则不会使用该消息。但是我找不到任何提到如果消息未被消耗如何重新处理消息的来源。我似乎认为,如果事务消费者失败,那么 Kafka 将回滚但继续提交偏移量,以便它可以处理下一条消息。但是如果Kafka提交了偏移量,那么之前的消息就丢失了?

我发现一些 Spring Kafka 页面描述了如何处理消费者中的异常。所以你基本上要么停止容器,要么停止整个应用程序。我认为 Kafka 有一些内部机制来处理这种行为,因为文档指出只有成功生成消息 B 才会消耗消息 A。但是如果消息 A 没有被消费,那么 Kafka 仍然会提交偏移量并继续下一条消息。

感觉就像恰好一次只适用于永远不会发生错误的情况。老实说,如果 Kafka 无论如何都会提交偏移量,我并不关心消息是被消费还是不被消费。即使消息没有被消费,消息也会丢失,所以似乎我必须停止容器或应用程序以确保我不会丢失任何消息。

https://spring.io/blog/2017/12/01/spring-for-apache-kafka-2-1-0-release-and-1-3-2-2-0-2-available https:/ /docs.spring.io/spring-kafka/reference/html/_reference.html#annotation-error-handling https://github.com/spring-cloud/spring-cloud-stream/issues/1114

编辑 1:我在下面添加了一些示例代码。

我有一个非常简单的 Spring Boot 应用程序。

@SpringBootApplication
@EnableTransactionManagement
public class KafkaTransactionMysteryApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaTransactionMysteryApplication.class, args);
    }

    @Bean
    @Primary
    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();
    }

    @Bean
    public ChainedKafkaTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) …
Run Code Online (Sandbox Code Playgroud)

java spring transactions apache-kafka spring-kafka

5
推荐指数
1
解决办法
7616
查看次数

@KafkaListener 并发多个主题

我想创建一个并发 @KafkaListener,它可以处理多个主题,每个主题都有不同数量的分区。

我注意到 Spring-Kafka 只为大多数分区的主题为每个分区初始化一个使用者。

示例:我将并发设置为 8。我@KafkaListener听了以下主题。主题 A 的分区最多 - 5 个,因此 Spring-Kafka 初始化了 5 个消费者。我希望 Spring-Kafka 初始化 8 个消费者,这是根据我的并发属性允许的最大值。

  • 主题 A 有 5 个分区
  • 主题 B 有 3 个分区
  • 话题 C 有 1

不初始化更多消费者的技术原因是什么?

我如何绕过这个,以便我可以使用@KafkaListener注释初始化更多的使用者?(如果可能的话)

apache-kafka spring-kafka

5
推荐指数
1
解决办法
3166
查看次数

无法捕获 DataIntegrityViolationException

我将 Spring Boot 2 与 spring-boot-starter-data-jpa 与底层 MariaDB 一起使用。

我有一个带有唯一键“用户名”的表。我想DataIntegrityViolationException知道是否违反了这个约束,但似乎 Spring 正在记录DataIntegrityViolationException并且不会重新抛出 after logging(我最好的猜测)。MySQLIntegrityConstraintViolationException而是抛出。

我想赶DataIntegrityViolationExceptionUserService.createUser(..)

下面是几个代码片段:

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class UserRepository {

    @PersistenceContext
    private EntityManager entityManager;

    public void save(User user) {
        entityManager.persist(user);
    }
}

@Service
@Transactional(value = Transactional.TxType.REQUIRED)
public class UserService {

@Autowired
private UserRepository userRepository;

private void createUser(User user){
    userRepository.save(user);
}
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪:

2018-09-22 14:20:33.163  WARN 10700 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 1062, SQLState: 23000
2018-09-22 14:20:33.163 …
Run Code Online (Sandbox Code Playgroud)

spring hibernate jpa spring-boot

1
推荐指数
1
解决办法
2632
查看次数