Arn*_*man 10 schema apache-kafka apache-spark spark-structured-streaming
我读过Spark Structured Streaming不支持将Kafka消息作为JSON读取的模式推断.有没有办法像Spark Streaming那样检索模式:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
Run Code Online (Sandbox Code Playgroud)
这是一种可行的方法:
在开始流式传输之前,从Kafka获取一小部分数据
从小批量中推断出架构
使用提取的模式开始流式传输数据.
下面的伪代码说明了这种方法.
步骤1:
从Kafka中提取一小批(两个记录),
val smallBatch = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.option("endingOffsets", """{"topicName":{"0":2}}""")
.load()
.selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()
Run Code Online (Sandbox Code Playgroud)
第2步:将小批量写入文件:
smallBatch.write.mode("overwrite").format("text").save("/batch")
Run Code Online (Sandbox Code Playgroud)
此命令将小批量写入hdfs目录/批处理.它创建的文件的名称是part-xyz*.因此,您首先需要使用hadoop FileSystem命令重命名该文件(请参阅org.apache.hadoop.fs._和org.apache.hadoop.conf.Configuration,这是一个示例/sf/answers/2939360161/)和然后将文件读为json:
val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema
Run Code Online (Sandbox Code Playgroud)
这里,batchName.txt是文件的新名称,smallBatchSchema包含从小批量推断的模式.
最后,您可以按如下方式传输数据(步骤3):
val inputDf = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=smallBatchSchema).as("data"))
.select("data.*")
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助!
它是采用这种结构可能:
myStream = spark.readStream.schema(spark.read.json("my_sample_json_file_as_schema.json").schema).json("my_json_file")..
Run Code Online (Sandbox Code Playgroud)
怎么会这样?好吧,由于spark.read.json(“ ..”)。schema恰好返回了所需的推断模式,因此可以将此返回模式用作spark.readStream的强制模式参数的参数。
我所做的是指定一个单行样本json作为推断架构的输入,因此它不会不必要地占用内存。如果您的数据发生变化,只需更新您的sample-json。
花了我一段时间才能弄清楚(手动构造StructTypes和StructFields有点痛苦。),因此,我将为所有支持而高兴:-)
小智 3
这不可能。Spark Streaming 支持开发中的有限模式推断,spark.sql.streaming.schemaInference设置为true:
默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保即使在失败的情况下,流式查询也将使用一致的模式。对于临时用例,您可以通过将 Spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。
但它不能用于从 Kafka 消息中提取 JSON,并且DataFrameReader.json不支持流式传输Datasets作为参数。
您必须手动提供模式如何使用结构化流从 Kafka 读取 JSON 格式的记录?
| 归档时间: |
|
| 查看次数: |
7971 次 |
| 最近记录: |