小编Jul*_*ien的帖子

如何从 Camel 以事务方式轮询 Kafka?

我目前正在研究基于kafka并由camel和Spring管理的消息总线。我有一个 XML 路由定义来轮询事件并从外部 API 检索相应的完整业务对象,如下所示:

`

<route id="station-event-enrich-route" autoStartup="true" >
        <from
            uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{events.topic.name}}&amp;autoCommitEnable=false&amp;allowManualCommit=true&amp;maxPollRecords={{station.brocker.bulk.limit}}&amp;groupId={{kafka.groupId}}" />

        <!-- SNIP logic to aggregate several events -->

        <pollEnrich strategyRef="keepHeadersAggregationStrategy">
            <simple>{{api.url}}?view=full&amp;id=$simple{in.headers.BUSINESS_ID}</simple>
        </pollEnrich>

        <!-- SNIP logic to split the retrieved events according to their ids -->

        <to uri="velocity:velocity/resource-object.vm"/>    

        <removeHeaders pattern="*" excludePattern="MANUAL_COMMIT"/>

        <to uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{objects.topic.name}}&amp;groupId={{kafka.groupId}}&amp;requestRequiredAcks=all" />

        <transform>
            <simple>${headers.MANUAL_COMMIT.commitSync()}</simple>
        </transform>
</route>
Run Code Online (Sandbox Code Playgroud)

` 我的问题如下:轮询kafka事件主题时,如果我的pollEnrich中的api.url不可用,则没有检索到业务对象并且事件丢失。所以我需要实现一个事务逻辑,以便能够在我的路由中回滚初始 kafka 轮询,以便可以多次轮询相同的事件,直到 api.url 向我发送等待的业务对象。

我尝试了几种方法,从将我的 org.apache.camel:camel-kafka 版本更新到 2.22.0 开始,以便能够进行手动提交。然后,我尝试实现一个基本的错误处理程序(配置为 maximumRedeliveries=-1 以进行无限重试),以便当 pollEnrich 触发 onException 时,我可以设置一个标头以避免执行最终的手动提交。显然,它有效,但我的事件再也不会被重新轮询。

我还尝试将事务标记与 spring-kafka 的 org.springframework.kafka.transaction.KafkaTransactionManager 实例一起使用,但这不是好方法,因为只有生产者是事务性的。

我缺少什么,正确的方法是什么?

我使用 Java 8、Camel 2.22.0 和 Spring 4.3.18.RELEASE(不推荐,但它应该可以工作)。

java apache-camel apache-kafka

4
推荐指数
1
解决办法
5154
查看次数

标签 统计

apache-camel ×1

apache-kafka ×1

java ×1