Apache Camel Kafka - 聚合kafka消息并定期发布到不同的主题

so-*_*ude 7 java apache-camel apache-kafka

我有一个用例:

我需要定期阅读和聚合来自kafka主题的消息,并发布到不同的主题.Localstorage不是一种选择.这就是我计划解决这个问题的方法,欢迎任何改进建议

要安排kafka消息的聚合和发布,请计划使用Aggregator EIP的completionInterval选项.这是代码.

  @Autowired ObjectMapper objectMapper;
  JacksonDataFormat jacksonDataFormat;

  @PostConstruct
  public void initialize(){
    //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
    jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
  }
Run Code Online (Sandbox Code Playgroud)

和路线:

public void configure() throws Exception {
    from("kafka:localhost:9092?topic=item-events" +
            "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
            .routeId("kafkapoller")
            .unmarshal(jacksonDataFormat)
            .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
  }
Run Code Online (Sandbox Code Playgroud)

str*_*y05 3

这看起来不错。要记住的事情:

  • 如果/当 JVM 在聚合周期中途终止时会发生什么?不关心,那就冷静,否则你可能想要调查PersistentAggregationRepository存储/重放消息,尽管你可以重放从 kafka 丢失的消息(这将是我最大的操作问题)
  • 接下来,考虑运行时控制。Camel 令人震惊的是它没有真正清楚地告诉你运行时发生了什么。诸如聚合器中的失控方法(即非常贪婪的正则表达式)之类的事情会让您对聚合交换的当前状态知之甚少,并且 JMX 可能不会告诉您太多有关正在发生的事情。
  • 我将使用 aAggregateController使您能够从外部强制完成交换,这样您就可以执行诸如关闭骆驼之类的操作,然后调用它来完成飞行中的交换