将Spark Structured Streaming与Confluent Schema Registry集成

Sou*_*uni 15 avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming

我在Spark Structured Streaming中使用Kafka Source来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但是与spark结构化流媒体的集成似乎是不可能的.

我已经看到了这个问题,但无法使用Confluent Schema Registry.使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)

tst*_*tes 16

我花了几个月的时间阅读源代码并测试出来.简而言之,Spark只能处理字符串和二进制序列化.您必须手动反序列化数据.在spark中,创建汇合的rest服务对象以获取架构.使用Avro解析器将响应对象中的架构字符串转换为Avro架构.接下来,正常阅读Kafka主题.然后使用Confluent KafkaAvroDeSerializer映射二进制类型的"值"列.我强烈建议深入了解这些类的源代码,因为这里有很多内容,所以为了简洁起见,我将遗漏许多细节.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()
Run Code Online (Sandbox Code Playgroud)

  • 似乎反序列化方法的签名需要一个字符串,但是在函数体中未使用。[KafkaAvroDeserializer.java](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java) (2认同)
  • @Karthikeyan https://github.com/confluenceinc/schema-registry/blob/master/client/src/main/java/io/confluence/kafka/schemaregistry/client/rest/RestService.java 是 `io.confluence 的一部分:kafka-schema-registry-client` 仓库在这里 https://docs.confluence.io/current/clients/install.html#installation-maven (2认同)

dud*_*rce 12

另一个非常简单的替代方案pyspark(不完全支持架构注册,如架构注册、兼容性检查等)可能是:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
Run Code Online (Sandbox Code Playgroud)

  • @Venkat,您好,这是必要的,因为 Confluence 为其内部 [wire 格式] 保留了第一个字节(https://docs.confluence.io/platform/current/schema-registry/serdes-develop/index.html#wire-format ) (6认同)
  • #神奇之处在于:这对我有用。但为什么我们需要跳过前 5 个字节。 (4认同)

cri*_*007 8

免责声明

此代码仅在本地主机上进行了测试,并且已报告在集群环境中遇到序列化程序问题。有一个替代解决方案(步骤 7-9,在步骤 10 中使用 Scala 代码)将架构 ID 提取到列,查找每个唯一 ID,然后使用架构广播变量,这在规模上会更好地工作。

此外,还有一个外部库AbsaOSS/ABRiS也可以解决在 Spark 中使用 Registry


由于删除了最有用的另一个答案,我想通过一些重构和评论重新添加它。

这是所需的依赖项。使用 Confluent 5.x 和 Spark 2.4 测试的代码

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

这是 Scala 实现(仅在 上进行了本地测试master=local[*]

第一部分,定义导入、一些字段和一些帮助方法来获取模式

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below
Run Code Online (Sandbox Code Playgroud)

然后定义一个简单的 main 方法来解析 CMD args 以获取 Kafka 详细信息

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues
Run Code Online (Sandbox Code Playgroud)

然后,消费Kafka主题并反序列化的重要方法

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues
Run Code Online (Sandbox Code Playgroud)

命令行解析器允许传入引导服务器、模式注册表、主题名称和 Spark 主服务器。

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues
Run Code Online (Sandbox Code Playgroud)

为了让上面的 UDF 正常工作,需要有一个反序列化器将字节的 DataFrame 转换为包含反序列化 Avro 的数据帧

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'
Run Code Online (Sandbox Code Playgroud)

将这些块中的每一个放在一起,在添加-b localhost:9092 -s http://localhost:8081 -t myTopicRun Configurations > Program Arguments后,它可以在 IntelliJ 中工作

  • 它无法在独立集群模式下工作..抛出无法执行用户定义的函数(anonfun$consumeAvro$1:(二进制)=&gt;字符串) (2认同)

Fel*_*elo 7

这个库将为您完成这项工作。它通过 Spark Structured Stream 连接到 Confluent Schema Registry。

对于 Confluent,它处理与有效负载一起发送的模式 ID。

在自述文件中,您将找到有关如何执行此操作的代码片段。

披露:我为 ABSA 工作,我开发了这个库。


小智 7

这是我的代码将 spark 结构化流与 kafka 和模式注册表(scala 中的代码)集成的示例

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {

  def main(args: Array[String]): Unit = {

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

}
Run Code Online (Sandbox Code Playgroud)

从 kafka 主题读取时,我们有这种模式:

键:二进制 | 值:二进制 | 主题:字符串 | 分区:整数 | 偏移量:长 | 时间戳:时间戳 | 时间戳类型:整数 |

正如我们所看到的,key 和 value 是二进制的,所以我们需要将 key 转换为字符串,在这种情况下,value 是 avro 格式的,所以我们可以通过调用from_avro函数来实现这一点。

除了 Spark 和 Kafka 依赖之外,我们还需要以下依赖:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_${scala.compat.version}</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>${confluent.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)


ran*_*l25 5

Databricks 现在提供此功能,但您必须为此付费:-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()
Run Code Online (Sandbox Code Playgroud)

请参阅:https : //docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html了解更多信息

一个很好的免费替代品是 ABRIS。请参阅:https : //github.com/AbsaOSS/ABRiS我们可以看到的唯一缺点是您需要在运行时提供 avro 模式的文件,以便框架可以在将其发布到 Kafka 主题之前在您的数据帧上强制执行此模式.

  • 只有 Databricks 支持注册表,而不是 Apache Spark 本身 (2认同)

tim*_*ang 5

根据@cricket_007 的回答,我创建了以下可以在我们的集群环境中运行的解决方案,包括以下新功能:

  • 您需要添加广播变量以将一些值传输到集群环境的映射操作中。Schema.Parser 和 KafkaAvroDeserializer 都不能在 spark 中序列化,所以这就是为什么你需要在 map 操作中初始化它们
  • 我的结构化流使用 foreachBatch 输出接收器。
  • 我应用 org.apache.spark.sql.avro.SchemaConverters 将 avro 模式格式转换为 spark StructType,以便您可以在 from_json 列函数中使用它来解析 Kafka 主题字段(键和值)中的数据帧。

首先,你需要加载一些包:

SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"

jars=(
  "org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}"    ## format("kafka")
  "org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}"    ## SchemaConverters
  "io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.schemaregistry.client.rest.RestService
  "io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)

./bin/spark-shell --packages ${"${jars[*]}"// /,}
Run Code Online (Sandbox Code Playgroud)

以下是我在 spark-shell 中测试的全部代码:

import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

import scala.collection.JavaConverters._
import java.time.LocalDateTime

spark.sparkContext.setLogLevel("Error")

val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic" 
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"

val restService = new RestService(schemaRegistryURL)

val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType


val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
  .writeStream
  .outputMode("append")
  //.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {

    val bcTopicName = sc.broadcast(topicName)
    val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
    val bcSchemaStrings = sc.broadcast(schemaStrings)
    
    val rstDF = batchDF.map {
      row =>
      
        val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
        //-- For both key and value
        val isKeys =  Map("key" -> true, "value" -> false)
        val deserializers = isKeys.transform{ (k,v) => 
            val des = new KafkaAvroDeserializer
            des.configure(props.asJava, v)
            des 
        }
        //-- For key only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, true)
        //-- For value only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, false)
        

        val inParser = new Schema.Parser
        //-- For both key and value
        val values = bcSchemaStrings.value.transform( (k,v) => 
            deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
        s"""{"key": ${values("key")}, "value": ${values("value")} }"""
        //-- For key only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
        //-- For value only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString  
      }
      .select(from_json(col("value"), schemaStruct).as("root"))
      .select("root.*")

    println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
    rstDF.printSchema
    rstDF.show(false)    

  })
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)