Spark 结构化流 - 动态更新数据帧的架构

How*_*Xie 10 schema apache-spark apache-spark-sql spark-structured-streaming

我有一个简单的结构化流作业,它监视 CSV 文件的目录并写入镶木地板文件 - 两者之间没有转换。

这项工作首先通过使用 读取 CSV 文件来构建数据框readStream(),并使用我通过调用名为buildSchema(). 这是代码:

  var df = spark
    .readStream
    .option("sep", "|")
    .option("header","true")
    .schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
    .csv(input_base_dir + table_name + "*")

  logger.info(" new batch indicator")

  if (df.schema != buildSchema(spark, table_name).get) {
    df = spark.sqlContext.createDataFrame(df.collectAsList(), buildSchema(spark, table_name).get)
  }

  val query =
    df.writeStream
      .format("parquet")
      .queryName("convertCSVtoPqrquet for table " + table_name)
      .option("path", output_base_dir + table_name + "/")
      .trigger(ProcessingTime(60.seconds))
      .start()
Run Code Online (Sandbox Code Playgroud)

工作运行良好,但我的问题是,我想始终使用最新的架构来构建我的数据框,或者换句话说,从 CSV 文件中读取。虽然buildSchema()可以获得最新的架构,但我不确定如何定期调用它(或每个 CSV 文件一次),然后使用最新的架构以某种方式重新生成或修改数据框。

测试时,我观察到只有query对象是一批接一批地连续运行;我放置的日志语句和if()模式比较语句仅在应用程序开始时发生一次。

query.start()调用后可以修改结构化流作业中的数据帧模式吗?如果我们无法更改数据框的架构,您有什么好的解决方法?

提前致谢。

Gal*_*odi 1

您可以利用foreachBatch方法定期加载最新的模式,然后将其与具体的微批量数据帧模式进行比较。例子:

    var streamingDF = spark
      .readStream
      .option("sep", "|")
      .option("header", "true")
      .schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
      .csv(input_base_dir + table_name + "*")


    val query =
      streamingDF
        .writeStream
        .foreachBatch((ds, i) => {
          logger.info(s"New batch indicator(${i})")
          val batchDf =
          if (ds.schema != buildSchema(spark, table_name).get) {
            spark.sqlContext.createDataFrame(ds.collectAsList(), buildSchema(spark, table_name).get)
          } else {
            ds
          }
          batchDf.write.parquet(output_base_dir + table_name + "/")
        })
        .trigger(ProcessingTime(60.seconds))
        .start()
Run Code Online (Sandbox Code Playgroud)