Sou*_*uni 15 avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming
我在Spark Structured Streaming中使用Kafka Source来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但是与spark结构化流媒体的集成似乎是不可能的.
我已经看到了这个问题,但无法使用Confluent Schema Registry.使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)
tst*_*tes 16
我花了几个月的时间阅读源代码并测试出来.简而言之,Spark只能处理字符串和二进制序列化.您必须手动反序列化数据.在spark中,创建汇合的rest服务对象以获取架构.使用Avro解析器将响应对象中的架构字符串转换为Avro架构.接下来,正常阅读Kafka主题.然后使用Confluent KafkaAvroDeSerializer映射二进制类型的"值"列.我强烈建议深入了解这些类的源代码,因为这里有很多内容,所以为了简洁起见,我将遗漏许多细节.
//Used Confluent version 3.2.2 to write this.
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
case class DeserializedFromKafkaRecord(key: String, value: String)
val schemaRegistryURL = "http://127.0.0.1:8081"
val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 20) //remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
Run Code Online (Sandbox Code Playgroud)
dud*_*rce 12
另一个非常简单的替代方案pyspark(不完全支持架构注册,如架构注册、兼容性检查等)可能是:
import requests
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *
# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"
# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))
# error check
response.raise_for_status()
# extract the schema from the response
schema = response.text
# run the query
query = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", topic) \
.load() \
# The magic goes here:
# Skip the first 5 bytes (reserved by schema registry encoding protocol)
.selectExpr("substring(value, 6) as avro_value") \
.select(from_avro(col("avro_value"), schema).alias("data")) \
.select(col("data.my_field")) \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()
Run Code Online (Sandbox Code Playgroud)
此代码仅在本地主机上进行了测试,并且已报告在集群环境中遇到序列化程序问题。有一个替代解决方案(步骤 7-9,在步骤 10 中使用 Scala 代码)将架构 ID 提取到列,查找每个唯一 ID,然后使用架构广播变量,这在规模上会更好地工作。
此外,还有一个外部库AbsaOSS/ABRiS也可以解决在 Spark 中使用 Registry
由于删除了最有用的另一个答案,我想通过一些重构和评论重新添加它。
这是所需的依赖项。使用 Confluent 5.x 和 Spark 2.4 测试的代码
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<!-- Conflicts with Spark's version -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是 Scala 实现(仅在 上进行了本地测试master=local[*])
第一部分,定义导入、一些字段和一些帮助方法来获取模式
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode
object App {
private var schemaRegistryClient: SchemaRegistryClient = _
private var kafkaAvroDeserializer: AvroDeserializer = _
def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
}
def avroSchemaToSparkSchema(avroSchema: String) = {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}
// ... continues below
Run Code Online (Sandbox Code Playgroud)
然后定义一个简单的 main 方法来解析 CMD args 以获取 Kafka 详细信息
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
.appName(App.getClass.getName)
.master(master)
.getOrCreate()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
spark.stop()
}
// ... still continues
Run Code Online (Sandbox Code Playgroud)
然后,消费Kafka主题并反序列化的重要方法
private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
import spark.implicits._
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
)
// Load the raw Kafka topic (byte stream)
val rawDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
// Deserialize byte stream into strings (Avro fields become JSON)
import org.apache.spark.sql.functions._
val jsonDf = rawDf.select(
// 'key.cast(DataTypes.StringType), // string keys are simplest to use
callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
callUDF("deserialize", 'value).as("value")
// excluding topic, partition, offset, timestamp, etc
)
// Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
avroSchemaToSparkSchema(rawSchema)
}
// Apply structured schema to JSON stream
val parsedDf = jsonDf.select(
'key, // keys are usually plain strings
// values are JSONified Avro records
from_json('value, dfValueSchema.dataType).alias("value")
).select(
'key,
$"value.*" // flatten out the value
)
// parsedDf.printSchema()
// Sample schema output
// root
// |-- key: string (nullable = true)
// |-- header: struct (nullable = true)
// | |-- time: long (nullable = true)
// | ...
// TODO: Do something interesting with this stream
parsedDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("truncate", false)
.start()
.awaitTermination()
}
// still continues
Run Code Online (Sandbox Code Playgroud)
命令行解析器允许传入引导服务器、模式注册表、主题名称和 Spark 主服务器。
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
masterOption.setRequired(false)
options.addOption(masterOption)
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
bootstrapOption.setRequired(true)
options.addOption(bootstrapOption)
val topicOption = new Option("t", "topic", true, "Kafka topic")
topicOption.setRequired(true)
options.addOption(topicOption)
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
schemaRegOption.setRequired(true)
options.addOption(schemaRegOption)
val parser = new BasicParser
parser.parse(options, args)
}
// still continues
Run Code Online (Sandbox Code Playgroud)
为了让上面的 UDF 正常工作,需要有一个反序列化器将字节的 DataFrame 转换为包含反序列化 Avro 的数据帧
// Simple wrapper around Confluent deserializer
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
// TODO: configure the deserializer for authentication
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val value = super.deserialize(bytes)
value match {
case str: String =>
str
case _ =>
val genericRecord = value.asInstanceOf[GenericRecord]
genericRecord.toString
}
}
}
} // end 'object App'
Run Code Online (Sandbox Code Playgroud)
将这些块中的每一个放在一起,在添加-b localhost:9092 -s http://localhost:8081 -t myTopic到Run Configurations > Program Arguments后,它可以在 IntelliJ 中工作
小智 7
这是我的代码将 spark 结构化流与 kafka 和模式注册表(scala 中的代码)集成的示例
import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col
object KafkaConsumerAvro {
def main(args: Array[String]): Unit = {
val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
val SCHEMA_REGISTRY_URL = "http://localhost:8081"
val TOPIC = "transactions"
val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest") // from starting
.load()
// Prints Kafka schema with columns (topic, offset, partition e.t.c)
df.printSchema()
// Create REST service to access schema registry and retrieve topic schema (latest)
val restService = new RestService(SCHEMA_REGISTRY_URL)
val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
val jsonSchema = valueRestResponseSchema.getSchema
val transactionDF = df.select(
col("key").cast("string"), // cast to string from binary value
from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
col("topic"),
col("offset"),
col("timestamp"),
col("timestampType"))
transactionDF.printSchema()
// Stream data to console for testing
transactionDF.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
从 kafka 主题读取时,我们有这种模式:
键:二进制 | 值:二进制 | 主题:字符串 | 分区:整数 | 偏移量:长 | 时间戳:时间戳 | 时间戳类型:整数 |
正如我们所看到的,key 和 value 是二进制的,所以我们需要将 key 转换为字符串,在这种情况下,value 是 avro 格式的,所以我们可以通过调用from_avro函数来实现这一点。
除了 Spark 和 Kafka 依赖之外,我们还需要以下依赖:
<!-- READ AND WRITE AVRO DATA -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
Databricks 现在提供此功能,但您必须为此付费:-(
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Run Code Online (Sandbox Code Playgroud)
请参阅:https : //docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html了解更多信息
一个很好的免费替代品是 ABRIS。请参阅:https : //github.com/AbsaOSS/ABRiS我们可以看到的唯一缺点是您需要在运行时提供 avro 模式的文件,以便框架可以在将其发布到 Kafka 主题之前在您的数据帧上强制执行此模式.
根据@cricket_007 的回答,我创建了以下可以在我们的集群环境中运行的解决方案,包括以下新功能:
首先,你需要加载一些包:
SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"
jars=(
"org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}" ## format("kafka")
"org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}" ## SchemaConverters
"io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}" ## import io.confluent.kafka.schemaregistry.client.rest.RestService
"io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}" ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)
./bin/spark-shell --packages ${"${jars[*]}"// /,}
Run Code Online (Sandbox Code Playgroud)
以下是我在 spark-shell 中测试的全部代码:
import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters
import scala.collection.JavaConverters._
import java.time.LocalDateTime
spark.sparkContext.setLogLevel("Error")
val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic"
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"
val restService = new RestService(schemaRegistryURL)
val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
val query = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
.writeStream
.outputMode("append")
//.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val bcTopicName = sc.broadcast(topicName)
val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
val bcSchemaStrings = sc.broadcast(schemaStrings)
val rstDF = batchDF.map {
row =>
val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
//-- For both key and value
val isKeys = Map("key" -> true, "value" -> false)
val deserializers = isKeys.transform{ (k,v) =>
val des = new KafkaAvroDeserializer
des.configure(props.asJava, v)
des
}
//-- For key only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, true)
//-- For value only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, false)
val inParser = new Schema.Parser
//-- For both key and value
val values = bcSchemaStrings.value.transform( (k,v) =>
deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
s"""{"key": ${values("key")}, "value": ${values("value")} }"""
//-- For key only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
//-- For value only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString
}
.select(from_json(col("value"), schemaStruct).as("root"))
.select("root.*")
println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
rstDF.printSchema
rstDF.show(false)
})
.trigger(Trigger.ProcessingTime("60 seconds"))
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6322 次 |
| 最近记录: |