小编Col*_*man的帖子

Spark Python Avro Kafka Deserialiser

我在python spark应用程序中创建了一个kafka流,可以解析通过它的任何文本.

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
Run Code Online (Sandbox Code Playgroud)

我想改变它,以便能够从kafka主题解析avro消息.解析文件中的avro消息时,我这样做:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  
Run Code Online (Sandbox Code Playgroud)

我是python和spark的新手,如何更改流以解析avro消息?另外,如何在从Kafka读取Avro消息时指定要使用的模式?我以前在java中完成了所有这些,但是python让我感到困惑.

编辑:

我尝试改为包含avro解码器

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误

            TypeError: 'DatumReader' object is not callable
Run Code Online (Sandbox Code Playgroud)

python avro apache-kafka apache-spark spark-streaming

3
推荐指数
1
解决办法
3927
查看次数