Spark Dataframe 以 avro 格式写入 kafka 主题?

Kal*_*hal 4 scala dataframe avro apache-kafka apache-spark

我在 Spark 中有一个数据框,看起来像

事件DF

   Sno|UserID|TypeExp
    1|JAS123|MOVIE
    2|ASP123|GAMES
    3|JAS123|CLOTHING
    4|DPS123|MOVIE
    5|DPS123|CLOTHING
    6|ASP123|MEDICAL
    7|JAS123|OTH
    8|POQ133|MEDICAL
    .......
    10000|DPS123|OTH
Run Code Online (Sandbox Code Playgroud)

我需要以 Avro 格式将其写入 Kafka 主题,目前我可以使用以下代码在 Kafka 中将其写入为 JSON

val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
  kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
    .option("kafka.bootstrap.servers", "Host:port")
    .option("topic", "eventdf")
    .save()
Run Code Online (Sandbox Code Playgroud)

现在我想以 Avro 格式将其写入 Kafka 主题

hi-*_*zir 6

火花 >= 2.4

您可以使用to_avro从函数spark-avro库。

import org.apache.spark.sql.avro._

eventDF.select(
  to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)
Run Code Online (Sandbox Code Playgroud)

火花 < 2.4

你必须以同样的方式做到这一点:

  • 创建一个函数,将序列化的 Avro 记录写入ByteArrayOutputStream并返回结果。一个简单的实现(这仅支持平面对象)可能类似于(从Sushil Kumar Singh 的Kafka Avro Scala Example 中采用)

    import org.apache.spark.sql.Row
    
    def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
      val gr: GenericRecord = new GenericData.Record(schema)
      row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
    
      val writer = new SpecificDatumWriter[GenericRecord](schema)
      val out = new ByteArrayOutputStream()
      val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
      writer.write(gr, encoder)
      encoder.flush()
      out.close()
    
      out.toByteArray()
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 将其转换为udf

    import org.apache.spark.sql.functions.udf
    
    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
    
    Run Code Online (Sandbox Code Playgroud)
  • 将其用作替代品 to_json

    eventDF.select(
      encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )
    
    Run Code Online (Sandbox Code Playgroud)