Vij*_*ade 2 java kafka-consumer-api
我正在尝试对我的卡夫卡消费者进行单元测试.我正在尝试使用java api MockConsumer附带的类kafka-client.以下是我的配置代码
@Bean
public MockConsumer consumer(){
MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("test-topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<String, String>("test-topic",0,
0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
4L, "mykey", "myvalue4"));
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition("test-topic", 0), 4L);
consumer.updateEndOffsets(endOffsets);
return consumer;
}
Run Code Online (Sandbox Code Playgroud)
现在当我在我的测试用例中使用这个MockConsumer Bean时,如下所示
@Autowired
MockConsumer kafkaConsumer;
@Autowired
@InjectMocks
MyConsumer myConsumer; //this is the class having consumer code. This
//is the class under test
@Test
public void testConsumeWithAutoAssignment() throws Exception {
myConsumer.consumeTopic("test-topic");
}
Run Code Online (Sandbox Code Playgroud)
我得到了例外
kafkaConsumer.subscribe(topic)
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
如果有人发现了问题或解决了这个问题,请告诉我.
ppa*_*rno 10
这是因为在您使用的bean中,consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));这意味着消费者想要从"test-topic"中使用特定分区(0).然后在某个地方,但我没有看到你提供的代码在哪里有一个电话subscribe(topic).使用subscribe,消费者成为消费者组的一部分,Kafka代理自动分配分区(用于重新平衡).您不能同时使用两者:分配特定分区(USER DEFINED)和订阅自动分配.
| 归档时间: |
|
| 查看次数: |
3028 次 |
| 最近记录: |