kafka MockConsumer抛出异常错误java.lang.IllegalStateException:对主题,分区和模式的订阅是互斥的

Vij*_*ade 2 java kafka-consumer-api

我正在尝试对我的卡夫卡消费者进行单元测试.我正在尝试使用java api MockConsumer附带的类kafka-client.以下是我的配置代码

@Bean
public MockConsumer consumer(){

    MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
    consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));

    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(new TopicPartition("test-topic", 0), 0L);
    consumer.updateBeginningOffsets(beginningOffsets);

    consumer.addRecord(new ConsumerRecord<String, String>("test-topic",0,
            0L, "mykey", "myvalue0"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            1L, "mykey", "myvalue1"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            2L, "mykey", "myvalue2"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            3L, "mykey", "myvalue3"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            4L, "mykey", "myvalue4"));
    HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
    endOffsets.put(new TopicPartition("test-topic", 0), 4L);
    consumer.updateEndOffsets(endOffsets);
    return consumer;
}
Run Code Online (Sandbox Code Playgroud)

现在当我在我的测试用例中使用这个MockConsumer Bean时,如下所示

@Autowired
MockConsumer kafkaConsumer;

@Autowired
@InjectMocks
MyConsumer myConsumer; //this is the class having consumer code. This 
                      //is the class under test

@Test
public void testConsumeWithAutoAssignment() throws Exception {
  myConsumer.consumeTopic("test-topic");
}
Run Code Online (Sandbox Code Playgroud)

我得到了例外

kafkaConsumer.subscribe(topic)

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

如果有人发现了问题或解决了这个问题,请告诉我.

ppa*_*rno 10

这是因为在您使用的bean中,consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));这意味着消费者想要从"test-topic"中使用特定分区(0).然后在某个地方,但我没有看到你提供的代码在哪里有一个电话subscribe(topic).使用subscribe,消费者成为消费者组的一部分,Kafka代理自动分配分区(用于重新平衡).您不能同时使用两者:分配特定分区(USER DEFINED)和订阅自动分配.