小编Seb*_* Le的帖子

spring-integration-kafka配置使用者从指定分区接收消息

我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.

示例我想生成到分区3的消息,而消费只会从分区3接收消息.

到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.

所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.

谢谢.

这是我的代码:

kafka-producer-context.xml

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter
    id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext"
    auto-startup="true" order="1" channel="inputToKafka" />
<int-kafka:producer-context id="kafkaProducerContext"
    producer-properties="producerProps">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration 
            broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
            async="true" topic="testTopic"
            key-class-type="java.lang.String" 
            key-encoder="encoder"
            value-class-type="java.lang.String" 
            value-encoder="encoder"
            partitioner="partitioner"
            compression-codec="default" />
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

<util:properties id="producerProps">
    <prop key="queue.buffering.max.ms">500</prop>
    <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
    <prop key="queue.buffering.max.messages">10000</prop>
    <prop key="retry.backoff.ms">100</prop>
    <prop key="message.send.max.retries">2</prop>
    <prop key="send.buffer.bytes">5242880</prop>
    <prop key="socket.request.max.bytes">104857600</prop>
    <prop key="socket.receive.buffer.bytes">1048576</prop>
    <prop key="socket.send.buffer.bytes">1048576</prop>
    <prop key="request.required.acks">1</prop>
</util:properties>

<bean id="encoder"
    class="org.springframework.integration.kafka.serializer.common.StringEncoder" />

<bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/>

<task:executor id="taskExecutor" pool-size="5"
    keep-alive="120" queue-capacity="500" />
Run Code Online (Sandbox Code Playgroud)

KafkaProducer.java

public class KafkaProducer {

private static final Logger logger …
Run Code Online (Sandbox Code Playgroud)

spring spring-mvc spring-integration apache-kafka kafka-consumer-api

6
推荐指数
1
解决办法
9123
查看次数