我在python spark应用程序中创建了一个kafka流,可以解析通过它的任何文本.
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
Run Code Online (Sandbox Code Playgroud)
我想改变它,以便能够从kafka主题解析avro消息.解析文件中的avro消息时,我这样做:
reader = DataFileReader(open("customer.avro", "r"), DatumReader())
Run Code Online (Sandbox Code Playgroud)
我是python和spark的新手,如何更改流以解析avro消息?另外,如何在从Kafka读取Avro消息时指定要使用的模式?我以前在java中完成了所有这些,但是python让我感到困惑.
编辑:
我尝试改为包含avro解码器
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
Run Code Online (Sandbox Code Playgroud)
但是我收到以下错误
TypeError: 'DatumReader' object is not callable
Run Code Online (Sandbox Code Playgroud)