inferSchema=true 不适用于读取 Spark 结构化流的 csv 文件

Ily*_*iev 5 scala apache-spark spark-csv spark-structured-streaming

我收到错误消息

\n
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.\n\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)\n    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)\n    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)\n    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)\n    at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)\n    at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)\n
Run Code Online (Sandbox Code Playgroud)\n

当我加载 csv 文件时

\n
spark2\n  .readStream\n  .format("csv")\n  .option("inferSchema", "true")\n  .option("header", "true")\n  //.schema(schema)\n  .option("delimiter", ",")\n  .option("maxFilesPerTrigger", 1)\n  .csv(path)\n
Run Code Online (Sandbox Code Playgroud)\n

我尝试了另一种格式的选项,例如

\n
spark2\n  .readStream\n  .format("csv")\n  .option("inferSchema", value = true)\n  .option("header", value = true)\n  //.schema(schema)\n  .option("delimiter", ",")\n  .option("maxFilesPerTrigger", 1)\n  .csv(path)\n
Run Code Online (Sandbox Code Playgroud)\n

我想推断架构并注释掉显式架构用法。

\n

csv 文件示例如下:

\n
id,Energy Data,Distance,Humidity,Ambient Temperature,Cold Water Temperature,Vibration Value 1,Vibration Value 2,Handle Movement\n1,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n2,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n3,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n4,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n5,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n6,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n7,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n8,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n9,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n10,0.00 246.47 0.00,4in, 12cm,55.50%,25\xc2\xb0C,25\xc2\xb0C,0,0,6.08 7.53 0.31m/s^2\n
Run Code Online (Sandbox Code Playgroud)\n

这里出了什么问题,因为我严格遵循选项说明,但是推断是如何发生的?

\n

Gab*_*bip 5

您在这里有 2 个选择:

  1. 在运行流式查询之前,将数据样本写入您的目标。当您再次运行流查询时,将推断架构。
  2. 设置:spark.sql.streaming.schemaInferencetrue
spark.sql("set spark.sql.streaming.schemaInference=true")
Run Code Online (Sandbox Code Playgroud)

来自文档

默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保即使在失败的情况下,流式查询也将使用一致的模式。对于临时用例,您可以通过将 Spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。