Spark-SQL:如何将TSV或CSV文件读入数据帧并应用自定义模式?

sta*_*010 5 scala apache-spark apache-spark-sql spark-dataframe

我在使用制表符分隔值(TSV)和逗号分隔值(CSV)文件时使用Spark 2.0.我想将数据加载到Spark-SQL数据帧中,我想在读取文件时完全控制模式.我不希望Spark从文件中的数据中猜出架构.

如何将TSV或CSV文件加载到Spark SQL Dataframes并将模式应用于它们?

sta*_*010 15

下面是一个完整的Spark 2.0示例,用于加载制表符分隔值(TSV)文件和应用架构.

我正在使用UAH.edu的TSV格式Iris数据集作为示例.以下是该文件的前几行:

Type    PW      PL      SW      SL
0       2       14      33      50
1       24      56      31      67
1       23      51      31      69
0       2       10      36      46
1       20      52      30      65
Run Code Online (Sandbox Code Playgroud)

要强制执行模式,可以使用以下两种方法之一以编程方式构建它:

A.使用以下方法创建架构StructType:

import org.apache.spark.sql.types._

var irisSchema = StructType(Array(
    StructField("Type",         IntegerType, true),
    StructField("PetalWidth",   IntegerType, true),
    StructField("PetalLength",  IntegerType, true),
    StructField("SepalWidth",   IntegerType, true),
    StructField("SepalLength",  IntegerType, true)
    ))
Run Code Online (Sandbox Code Playgroud)

B.或者,创建模式与case classEncoders(此方法不太详细):

import org.apache.spark.sql.Encoders

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int, 
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema
Run Code Online (Sandbox Code Playgroud)

创建架构后,可以使用spark.read读取TSV文件.请注意,只要option("delimiter", d)正确设置选项,您实际上也可以读取逗号分隔值(CSV)文件或任何分隔文件.此外,如果您有一个具有标题行的数据文件,请务必进行设置option("header", "true").

以下是完整的最终代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders

val spark = SparkSession.builder().getOrCreate()

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

var irisDf = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
                option("header", "true").  // Does the file have a header line?
                option("delimiter", "\t"). // Set delimiter to tab or comma.
                schema(irisSchema).        // Schema that was built above.
                load("iris.tsv")

irisDf.show(5)
Run Code Online (Sandbox Code Playgroud)

这是输出:

scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
|   0|         2|         14|        33|         50|
|   1|        24|         56|        31|         67|
|   1|        23|         51|        31|         69|
|   0|         2|         10|        36|         46|
|   1|        20|         52|        30|         65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)