我目前正在研究基于kafka并由camel和Spring管理的消息总线。我有一个 XML 路由定义来轮询事件并从外部 API 检索相应的完整业务对象,如下所示:
`
<route id="station-event-enrich-route" autoStartup="true" >
<from
uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&topic={{events.topic.name}}&autoCommitEnable=false&allowManualCommit=true&maxPollRecords={{station.brocker.bulk.limit}}&groupId={{kafka.groupId}}" />
<!-- SNIP logic to aggregate several events -->
<pollEnrich strategyRef="keepHeadersAggregationStrategy">
<simple>{{api.url}}?view=full&id=$simple{in.headers.BUSINESS_ID}</simple>
</pollEnrich>
<!-- SNIP logic to split the retrieved events according to their ids -->
<to uri="velocity:velocity/resource-object.vm"/>
<removeHeaders pattern="*" excludePattern="MANUAL_COMMIT"/>
<to uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&topic={{objects.topic.name}}&groupId={{kafka.groupId}}&requestRequiredAcks=all" />
<transform>
<simple>${headers.MANUAL_COMMIT.commitSync()}</simple>
</transform>
</route>
Run Code Online (Sandbox Code Playgroud)
` 我的问题如下:轮询kafka事件主题时,如果我的pollEnrich中的api.url不可用,则没有检索到业务对象并且事件丢失。所以我需要实现一个事务逻辑,以便能够在我的路由中回滚初始 kafka 轮询,以便可以多次轮询相同的事件,直到 api.url 向我发送等待的业务对象。
我尝试了几种方法,从将我的 org.apache.camel:camel-kafka 版本更新到 2.22.0 开始,以便能够进行手动提交。然后,我尝试实现一个基本的错误处理程序(配置为 maximumRedeliveries=-1 以进行无限重试),以便当 pollEnrich 触发 onException 时,我可以设置一个标头以避免执行最终的手动提交。显然,它有效,但我的事件再也不会被重新轮询。
我还尝试将事务标记与 spring-kafka 的 org.springframework.kafka.transaction.KafkaTransactionManager 实例一起使用,但这不是好方法,因为只有生产者是事务性的。
我缺少什么,正确的方法是什么?
我使用 Java 8、Camel 2.22.0 和 Spring 4.3.18.RELEASE(不推荐,但它应该可以工作)。