Gan*_*alf 8 java apache-kafka kafka-consumer-api
我试图在Kafka 9中使用SimpleConsumer来允许用户从时间偏移重放事件 - 但是我从Kafka收到的消息是一个非常奇怪的编码:
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\?W>8??????{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}?!}?a?????{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}???r?????{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}????????{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}?p=
??????{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Run Code Online (Sandbox Code Playgroud)
使用KafkaConsumer这个消息解析得很好.这是我用来使用SimpleConsumer检索消息的代码:
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.debug("Found an old offset - skip");
continue;
}
readOffset = messageAndOffset.nextOffset();
int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
byte[] data = messageAndOffset.message().payload().array();
byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
log.debug("Read " + new String(realData, "UTF-8"));
}
Run Code Online (Sandbox Code Playgroud)
在我不断收到关于字节太高的UTF-32错误之后,我添加了代码以跳过前x个字节,我假设这是因为Kafka会将有关消息大小的信息添加到有效负载中.这是Avro神器吗?
我从来没有找到一个好的答案 - 但我转而使用SimpleConsumer Kafka 来查询我需要的偏移量(每个分区......尽管实现很差),然后使用本机 KafkaConsumer 使用seek(TopicPartition, offset)或 来seekToBeginning(TopicPartition) 检索消息。希望他们能够在下一个版本中向本机客户端添加从给定时间戳检索消息的功能。
| 归档时间: |
|
| 查看次数: |
1239 次 |
| 最近记录: |