Spark结构化流kafka转换JSON没有架构(推断架构)

Arn*_*man 10 schema apache-kafka apache-spark spark-structured-streaming

我读过Spark Structured Streaming不支持将Kafka消息作为JSON读取的模式推断.有没有办法像Spark Streaming那样检索模式:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 
Run Code Online (Sandbox Code Playgroud)

D.M*_*.M. 9

这是一种可行的方法:

  1. 在开始流式传输之前,从Kafka获取一小部分数据

  2. 从小批量中推断出架构

  3. 使用提取的模式开始流式传输数据.

下面的伪代码说明了这种方法.

步骤1:

从Kafka中提取一小批(两个记录),

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """{"topicName":{"0":2}}""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()
Run Code Online (Sandbox Code Playgroud)

第2步:将小批量写入文件:

smallBatch.write.mode("overwrite").format("text").save("/batch")
Run Code Online (Sandbox Code Playgroud)

此命令将小批量写入hdfs目录/批处理.它创建的文件的名称是part-xyz*.因此,您首先需要使用hadoop FileSystem命令重命名该文件(请参阅org.apache.hadoop.fs._和org.apache.hadoop.conf.Configuration,这是一个示例/sf/answers/2939360161/)和然后将文件读为json:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema
Run Code Online (Sandbox Code Playgroud)

这里,batchName.txt是文件的新名称,smallBatchSchema包含从小批量推断的模式.

最后,您可以按如下方式传输数据(步骤3):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!


Ayd*_* K. 6

采用这种结构可能:

myStream = spark.readStream.schema(spark.read.json("my_sample_json_file_as_schema.json").schema).json("my_json_file")..
Run Code Online (Sandbox Code Playgroud)

怎么会这样?好吧,由于spark.read.json(“ ..”)。schema恰好返回了所需的推断模式,因此可以将此返回模式用作spark.readStream的强制模式参数的参数。

我所做的是指定一个单行样本json作为推断架构的输入,因此它不会不必要地占用内存。如果您的数据发生变化,只需更新您的sample-json。

花了我一段时间才能弄清楚(手动构造StructTypes和StructFields有点痛苦。),因此,我将为所有支持而高兴:-)


小智 3

这不可能。Spark Streaming 支持开发中的有限模式推断,spark.sql.streaming.schemaInference设置为true

默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保即使在失败的情况下,流式查询也将使用一致的模式。对于临时用例,您可以通过将 Spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

但它不能用于从 Kafka 消息中提取 JSON,并且DataFrameReader.json不支持流式传输Datasets作为参数。

您必须手动提供模式如何使用结构化流从 Kafka 读取 JSON 格式的记录?