我正在使用 spring kafka 并面临一些错误
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException.
给出了我的消费者生产者代码
` @EnableKafka @Configuration 公共类 KafkaConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.registry-server}")
private String registryAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 600000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 600000);
final AvroSerde avroSerde = new AvroSerde();
avroSerde.configure(props, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, avroSerde.deserializer().getClass());
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), avroSerde.deserializer()); …Run Code Online (Sandbox Code Playgroud)