使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)

Tal*_*ffe 8 scala avro apache-kafka spark-streaming apache-spark-2.0

我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.

结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.

在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.

在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.

我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()
Run Code Online (Sandbox Code Playgroud)

我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."

我怎样才能反序化值?

Mic*_*oll 3

我还不太熟悉 Spark 的序列化如何与新的/实验性的结构化流结合使用,但是下面的方法确实有效 - 尽管我不确定这是否是最好的方法(恕我直言,该方法看起来有点尴尬)感觉)。

我将尝试以自定义数据类型(此处:案例Foo类)而不是专门的 Avro 为例来回答您的问题,但我希望它无论如何都能对您有所帮助。这个想法是使用 Kryo 序列化来序列化/反序列化您的自定义类型,请参阅Spark 文档中的调优:数据序列化。

注意:Spark 支持通过内置(隐式)编码器对案例类进行开箱即用的序列化,您可以通过import spark.implicits._. 但为了这个例子,让我们忽略这个功能。

想象一下,您已将以下Foo案例类定义为自定义类型(TL;DR 提示:为了防止遇到奇怪的 Spark 序列化投诉/错误,您应该将代码放入单独的Foo.scala文件中):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)
Run Code Online (Sandbox Code Playgroud)

现在,您有以下结构化流代码来从 Kafka 读取数据,其中输入主题包含消息值是二进制编码的 Kafka 消息String,您的目标是基于这些消息值创建Foo实例(即类似于您的方式) d 将二进制数据反序列化为 Avro 类的实例):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()
Run Code Online (Sandbox Code Playgroud)

现在我们将这些值反序列化为自定义Foo类型的实例,为此我们首先需要定义一个隐式Encoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))
Run Code Online (Sandbox Code Playgroud)

回到你的 Avro 问题,你需要做的是:

  1. 创建适合Encoder您需求的产品。
  2. 替换Foo(new String(row.getAs[Array[Byte]]("value"))为将二进制编码的 Avro 数据反序列化为 Avro POJO 的代码,即从消息值 ( ) 中取出二进制编码的 Avro 数据row.getAs[Array[Byte]]("value")并返回 AvroGenericRecordSpecificCustomAvroObject您在其他地方定义的任何内容的代码。

如果其他人知道更简洁/更好/...的方法来回答塔尔的问题,我会洗耳恭听。:-)

也可以看看: