小编Eda*_*yan的帖子

向节点 1001 发送获取请求(sessionId=INVALID,epoch=INITIAL)时出错:org.apache.kafka.common.errors.DisconnectException

我正在使用 spring kafka 并面临一些错误

Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException.

给出了我的消费者生产者代码

` @EnableKafka @Configuration 公共类 KafkaConfig {

    @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.consumer.registry-server}")
    private String registryAddress;

    @Value(value = "${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 600000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 600000);


        final AvroSerde avroSerde = new AvroSerde();
        avroSerde.configure(props, false);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, avroSerde.deserializer().getClass());
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), avroSerde.deserializer()); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api spring-kafka

6
推荐指数
1
解决办法
4102
查看次数