我正在尝试使用Avro来读取/写入Kafka的消息.有没有人有一个使用Avro二进制编码器编码/解码将被放在消息队列中的数据的例子?
我需要Avro部件而不是Kafka部件.或者,也许我应该看一个不同的解决方案?基本上,我正试图在空间方面找到更有效的JSON解决方案.刚刚提到Avro,因为它比JSON更紧凑.
我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.
结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.
在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.
在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.
我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.
到目前为止,我尝试了这种代码(我在这里看到了另一个问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
Run Code Online (Sandbox Code Playgroud)
我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."
我怎样才能反序化值?