Spark Python Avro Kafka Deserialiser

Col*_*man 3 python avro apache-kafka apache-spark spark-streaming

我在python spark应用程序中创建了一个kafka流,可以解析通过它的任何文本.

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
Run Code Online (Sandbox Code Playgroud)

我想改变它,以便能够从kafka主题解析avro消息.解析文件中的avro消息时,我这样做:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  
Run Code Online (Sandbox Code Playgroud)

我是python和spark的新手,如何更改流以解析avro消息?另外,如何在从Kafka读取Avro消息时指定要使用的模式?我以前在java中完成了所有这些,但是python让我感到困惑.

编辑:

我尝试改为包含avro解码器

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误

            TypeError: 'DatumReader' object is not callable
Run Code Online (Sandbox Code Playgroud)

Zol*_*dor 5

我遇到了同样的挑战 - 在pyspark中反序列化来自Kafka的avro消息并使用Confluent Schema Registry模块的Messageserializer方法解决它,因为我们的模式存储在Confluent Schema Registry中.

您可以在https://github.com/verisign/python-confluent-schemaregistry找到该模块

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()
Run Code Online (Sandbox Code Playgroud)

显然,你可以看到这个代码使用的是没有接收器的新的直接方法,因此创建了DirectDream(更多信息请参见https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)