我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.
示例我想生成到分区3的消息,而消费只会从分区3接收消息.
到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.
所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.
谢谢.
这是我的代码:
kafka-producer-context.xml
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true" order="1" channel="inputToKafka" />
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProps">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
async="true" topic="testTopic"
key-class-type="java.lang.String"
key-encoder="encoder"
value-class-type="java.lang.String"
value-encoder="encoder"
partitioner="partitioner"
compression-codec="default" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<util:properties id="producerProps">
<prop key="queue.buffering.max.ms">500</prop>
<prop key="topic.metadata.refresh.interval.ms">3600000</prop>
<prop key="queue.buffering.max.messages">10000</prop>
<prop key="retry.backoff.ms">100</prop>
<prop key="message.send.max.retries">2</prop>
<prop key="send.buffer.bytes">5242880</prop>
<prop key="socket.request.max.bytes">104857600</prop>
<prop key="socket.receive.buffer.bytes">1048576</prop>
<prop key="socket.send.buffer.bytes">1048576</prop>
<prop key="request.required.acks">1</prop>
</util:properties>
<bean id="encoder"
class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
<bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/>
<task:executor id="taskExecutor" pool-size="5"
keep-alive="120" queue-capacity="500" />
Run Code Online (Sandbox Code Playgroud)
KafkaProducer.java
public class KafkaProducer {
private static final Logger logger …Run Code Online (Sandbox Code Playgroud) spring spring-mvc spring-integration apache-kafka kafka-consumer-api