Nic*_*yan 1 avro apache-kafka apache-nifi
我想使用 apache nifi 将一些通用数据生成到 kafka 主题中,并且我希望这些数据采用 avro 格式。我为它做了什么:
{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }
ConvertAvroSchema 设置:
PublishKafkaRecord 设置:
AvroReader 设置:
AvroRecordSetWriter 设置:

然后我尝试使用 kafka 流读取它:
public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> source = builder.stream("topic");
source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();
}
Run Code Online (Sandbox Code Playgroud)
}
GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java
结果我得到错误:
由:org.apache.kafka.common.errors.SerializationException:反序列化 id -1 的 Avro 消息时出错
我还尝试在 avroreader\writer 中显式设置 avro 模式,但没有帮助。另外,如果我尝试简单地从主题中读取字节并将其转换为字符串表示,我会得到如下信息:
Objavro.schema{"type":"record","name":"my_schema","namespace":"my_namespace","doc":"","fields":[{"name":"key"," type":"int"},{"name":"value","type":["null","int"]},{"name":"event_time","type":"long"}] }avro.codecsnappyÛ4ým[©q ÃàG0 ê¸ä»/}½{Û4ým[©q ÃàG0
我该如何解决?
在 PublishKafka 处理器中,您的 Avro writer 配置了“Embedded Avro Schema”的“Schema Write Strategy”。这意味着写入 Kafka 的消息是嵌入了完整架构的标准 Avro 消息。
在您的消费者方面(Kafka 流),它看起来似乎希望使用融合模式注册表,在这种情况下,它不希望嵌入 Avro 模式,而是希望使用指定模式 ID 的特殊字节序列,然后是裸阿夫罗消息。
假设您希望保持消费者原样,那么在 NiFi 方面,您将希望将 Avro 编写器的“架构写入策略”更改为“Confluent Schema Registry Reference”。我认为这可能还需要您更改 Avro 阅读器以使用 Confluent Schema Registry 服务访问架构。
或者,也许有一种方法可以让 Kafka Streams 读取嵌入式模式而不使用 Confluent 模式注册表,但我之前没有使用过 Kafka Streams,所以我不能说这是否可能。