so-*_*ude 7 java apache-camel apache-kafka
我有一个用例:
我需要定期阅读和聚合来自kafka主题的消息,并发布到不同的主题.Localstorage不是一种选择.这就是我计划解决这个问题的方法,欢迎任何改进建议
要安排kafka消息的聚合和发布,请计划使用Aggregator EIP的completionInterval选项.这是代码.
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
Run Code Online (Sandbox Code Playgroud)
和路线:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
Run Code Online (Sandbox Code Playgroud)
这看起来不错。要记住的事情:
PersistentAggregationRepository存储/重放消息,尽管你可以重放从 kafka 丢失的消息(这将是我最大的操作问题)AggregateController使您能够从外部强制完成交换,这样您就可以执行诸如关闭骆驼之类的操作,然后调用它来完成飞行中的交换| 归档时间: |
|
| 查看次数: |
721 次 |
| 最近记录: |