如何在 Spark 中创建 Schema 文件

Gla*_*tor 5 schema scala apache-spark-sql orc

我正在尝试读取一个架构文件(它是一个文本文件)并将其应用于我的 CSV 文件而没有标题。由于我已经有一个架构文件,因此我不想使用InferSchema开销很大的选项。

我的输入架构文件如下所示,

"num IntegerType","letter StringType"
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用以下代码来创建架构文件,

val schema_file = spark.read.textFile("D:\\Users\\Documents\\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))
Run Code Online (Sandbox Code Playgroud)

我收到如下错误

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType
Run Code Online (Sandbox Code Playgroud)

- 字段(类:“org.apache.spark.sql.types.DataType”,名称:“_2”) - 根类:“scala.Tuple2”

并尝试将其用作模式文件,同时使用spark.read.csv如下所示并将其写为 ORC 文件

  val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(schema_file)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")
Run Code Online (Sandbox Code Playgroud)

需要帮助将文本文件转换为架构文件并将我的输入 CSV 文件转换为 ORC。

ank*_*000 7

schema.json您可以创建一个以下格式命名的 JSON 文件

{
  "fields": [
    {
      "metadata": {},
      "name": "first_fields",
      "nullable": true,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "double_field",
      "nullable": true,
      "type": "double"
    }
  ],
  "type": "struct"
}
Run Code Online (Sandbox Code Playgroud)

通过读取该文件创建一个结构模式

rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)
Run Code Online (Sandbox Code Playgroud)

之后,您可以使用struct作为模式来读取csv文件

val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(custom_schema)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")
Run Code Online (Sandbox Code Playgroud)


Sha*_*ala 6

若要从架构text文件创建一个函数来matchtype,并返回DataType作为

def getType(raw: String): DataType = {
  raw match {
    case "ByteType" => ByteType
    case "ShortType" => ShortType
    case "IntegerType" => IntegerType
    case "LongType" => LongType
    case "FloatType" => FloatType
    case "DoubleType" => DoubleType
    case "BooleanType" => BooleanType
    case "TimestampType" => TimestampType
    case _ => StringType
  }
}
Run Code Online (Sandbox Code Playgroud)

现在通过读取架构文件来创建架构

val schema = Source.fromFile("schema.txt").getLines().toList
  .flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
  .map(x => StructField(x(0), getType(x(1)), true))
Run Code Online (Sandbox Code Playgroud)

现在将 csv 文件读取为

spark.read
  .option("samplingRatio", "0.01")
  .option("delimiter", "|")
  .option("nullValue", "NULL")
  .schema(StructType(schema))
  .csv("data.csv")
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!