在Kafka中读取字段'topic_metadata'时出错

use*_*021 8 apache-kafka kafka-producer-api

我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理.但是当我尝试使用Java客户端生产者连接到代理时,我得到以下内容error.

1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - kafka生产者I/O线程中未捕获的错误:org.apache.kafka.common.protocol.types.SchemaException:读取字段'topic_metadata'时出错:在org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java)的org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)中读取大小为619631的数组,只有37个字节可用时出错:380)org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)位于org.apache.kafka.clients的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269). producer.internals.Sender.run(Sender.java:229)atg.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)at java.lang.Thread.run(Unknown Source)

以下是我的客户端生产者代码.

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}
Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?

San*_*eev 3

我也遇到过类似的问题。这里的问题是,当pom文件中的kafka客户端版本与kafka服务器不同时,kafka客户端版本不匹配。我使用的是kafka客户端0.10.0.0_1,但kafka服务器仍然是0.9.0.0。所以我将kafka服务器版本升级到10,问题得到解决。

<dependency>
            <groupId>org.apache.servicemix.bundles</groupId>
            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
            <version>0.10.0.0_1</version>
        </dependency>            
Run Code Online (Sandbox Code Playgroud)