How*_*Xie 10 schema apache-spark apache-spark-sql spark-structured-streaming
我有一个简单的结构化流作业,它监视 CSV 文件的目录并写入镶木地板文件 - 两者之间没有转换。
这项工作首先通过使用 读取 CSV 文件来构建数据框readStream(),并使用我通过调用名为buildSchema(). 这是代码:
var df = spark
.readStream
.option("sep", "|")
.option("header","true")
.schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
.csv(input_base_dir + table_name + "*")
logger.info(" new batch indicator")
if (df.schema != buildSchema(spark, table_name).get) {
df = spark.sqlContext.createDataFrame(df.collectAsList(), buildSchema(spark, table_name).get)
}
val query =
df.writeStream
.format("parquet")
.queryName("convertCSVtoPqrquet for table " + table_name)
.option("path", output_base_dir + table_name + "/")
.trigger(ProcessingTime(60.seconds))
.start()
Run Code Online (Sandbox Code Playgroud)
工作运行良好,但我的问题是,我想始终使用最新的架构来构建我的数据框,或者换句话说,从 CSV 文件中读取。虽然buildSchema()可以获得最新的架构,但我不确定如何定期调用它(或每个 CSV 文件一次),然后使用最新的架构以某种方式重新生成或修改数据框。
测试时,我观察到只有query对象是一批接一批地连续运行;我放置的日志语句和if()模式比较语句仅在应用程序开始时发生一次。
query.start()调用后可以修改结构化流作业中的数据帧模式吗?如果我们无法更改数据框的架构,您有什么好的解决方法?
提前致谢。
您可以利用foreachBatch方法定期加载最新的模式,然后将其与具体的微批量数据帧模式进行比较。例子:
var streamingDF = spark
.readStream
.option("sep", "|")
.option("header", "true")
.schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
.csv(input_base_dir + table_name + "*")
val query =
streamingDF
.writeStream
.foreachBatch((ds, i) => {
logger.info(s"New batch indicator(${i})")
val batchDf =
if (ds.schema != buildSchema(spark, table_name).get) {
spark.sqlContext.createDataFrame(ds.collectAsList(), buildSchema(spark, table_name).get)
} else {
ds
}
batchDf.write.parquet(output_base_dir + table_name + "/")
})
.trigger(ProcessingTime(60.seconds))
.start()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1063 次 |
| 最近记录: |