从Apache Camel中的特定Offset开始阅读Kafka主题

Gat*_*sko 6 java apache-camel apache-kafka

我阅读了Camel Kafka的所有文档,我读到的唯一方法是来自git和指定的路由构建器

    public void configure() throws Exception {
                    from("kafka:" + TOPIC
                                 + "?groupId=A"
                                 + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                                 + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                                 + "&offsetRepository=#offset")            // Keep the offset in our repository
                            .to("mock:result");

}
Run Code Online (Sandbox Code Playgroud)

但是对于客户的订单,我需要使用Spring,所以我的kafka端点就是这样

<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
        <endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
            <property key="topic" value="tagBlink"/>
            <property key="brokers" value="10.0.0.165:9092"/>
            <property key="offsetRepository" value="100"/>
        </endpoint>
Run Code Online (Sandbox Code Playgroud)

但得到一个例外

无法为属性找到合适的setter:offsetRepository,因为没有相同类型的setter方法:java.lang.String也不能进行类型转换:没有类型转换器可用于从类型:java.lang.String转换为所需类型:org.apache.camel.spi.StateRepository,值为100

这是我的当前配置吗?如何从特定偏移恢复??

Gat*_*sko 2

这次之后我成功地处理了这个问题。我为此遵循了 Spring Bean 创建,并检查了文档中的“FileStateRepository我需要一个文件”,因此我创建了一个文件 Bean 并添加为构造函数参数。之后我添加了一个init-method="doStart". 该方法如果存在则加载文件,如果不存在则创建该文件。

     <endpoint id="event" uri="kafka:localhost:9092">
        <property key="topic" value="eventTopic4"/>
        <property key="brokers" value="localhost:9092"/>
        <property key="autoOffsetReset" value="earliest"/>
        <property key="offsetRepository" value="#myRepo2"/>
    </endpoint>

    <bean id="myFileOfMyRepo" class="java.io.File">
        <constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
    </bean>

    <bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
        <constructor-arg ref="myFileOfMyRepo"/>
    </bean>
Run Code Online (Sandbox Code Playgroud)

之后我在Git中看到了Camel的KafkaConsumer的代码。

    offsetRepository.getState(serializeOffsetKey(topicPartition));
    if (offsetState != null && !offsetState.isEmpty()) {
        // The state contains the last read offset so you need to seek from the next one
        long offset = deserializeOffsetValue(offsetState) + 1;
        log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
        consumer.seek(topicPartition, offset);
    } 
Run Code Online (Sandbox Code Playgroud)

这样我就成功地从最后一个偏移量开始读取。我希望 Camel 文档为 Kafka 添加这个额外的步骤。