Rat*_*tha 7 java avro apache-kafka kafka-consumer-api
我能够将我的java bean类作为avro记录发布到kafka.但是当我尝试消耗时,我得到了类强制转换异常.为什么会这样?
制片人
Schema schema = new Schema.Parser().parse(new File("/schemas/avro_schemas/test_schema.avsc"));
GenericRecord payload = new GenericData.Record(schema);
payload.put("name", fileName);
payload.put("timestamp", dateTime.toString());
payload.put("source", source);
payload.put("content", buf);
payload.put("customerCode", customercode);
producer.publish(topic, payload, schema);
Run Code Online (Sandbox Code Playgroud)
消费者
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
try {
byte[] received_message = it.next().message();
Schema schema = new Schema.Parser().parse(new File("/schemas/avro_schemas/test_schema.avsc"));
DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
GenericRecord payload = reader.read(null, decoder);
Run Code Online (Sandbox Code Playgroud)
例外
ava.lang.ClassCastException: com.xxx.File cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.setField(GenericData.java:573)
at org.apache.avro.generic.GenericData.setField(GenericData.java:590)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.xxx.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:56)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.
Run Code Online (Sandbox Code Playgroud)
Avro架构
{
"namespace": "com.xx"
"type": "record",
"name": "File",
"fields":[
{
"name": "name", "type": "string"
},
{
"name": "timestamp", "type": "string"
},
{
"name": "source", "type": "string"
},
{
"name": "content", "type": "bytes"
},
{
"name": "customerCode", "type": "string"
}
]
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6581 次 |
最近记录: |