将CSV读入具有时间戳和日期类型的Spark Dataframe

Mih*_*nde 18 apache-spark apache-spark-sql apache-spark-1.6

这是与Spark 1.6的CDH .

我正在尝试将此假设CSV导入到Apache SparkFrame的apache中:

$ hadoop fs -cat test.csv
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a
Run Code Online (Sandbox Code Playgroud)

我使用databricks-csv jar.

val textData = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")
Run Code Online (Sandbox Code Playgroud)

我使用inferSchema为生成的DataFrame制作模式.printSchema()函数为上面的代码提供了以下输出:

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+
Run Code Online (Sandbox Code Playgroud)

C3列具有String类型.我希望C3有日期类型.为了使它到达日期类型我尝试了以下代码.

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

scala> textData.printSchema
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: timestamp (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+--------------------+---+--------------------+---+
| C0| C1| C2|                  C3| C4|                  C5| C6|
+---+---+---+--------------------+---+--------------------+---+
|  a|  b|  c|2016-09-09 00:00:...|  a|2016-11-11 00:00:...|  a|
|  a|  b|  c|2016-09-10 00:00:...|  a|2016-11-11 00:00:...|  a|
+---+---+---+--------------------+---+--------------------+---+
Run Code Online (Sandbox Code Playgroud)

这段代码和第一个块之间的唯一区别是dateFormat选项行(我使用"yyyy-MM-dd"而不是"yyyy-MM-dd HH:mm:ss").现在我得到C3和C5作为时间戳(C3仍然不是日期).但是对于C5,HH :: mm:ss部分被忽略并在数据中显示为零.

理想情况下,我希望C3为date类型,C5为timestamp类型,其HH:mm:ss部分不被忽略.我的解决方案现在看起来像这样.我通过从我的数据库并行提取数据来制作csv.我确保将所有日期作为时间戳(不理想).所以,测试csv现在看起来像这样:

$ hadoop fs -cat new-test.csv
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a
Run Code Online (Sandbox Code Playgroud)

这是我最后的工作代码:

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .schema(finalSchema)
    .option("nullValue", "null")
    .load("new-test.csv")
Run Code Online (Sandbox Code Playgroud)

在这里,我使用dateFormat中的完整时间戳格式("yyyy-MM-dd HH:mm:ss").我手动创建finalSchema实例,其中c3是日期,C5是Timestamp类型(Spark sql类型).我应用这些架构使用schema()函数.输出如下所示:

scala> finalSchema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true))

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: date (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)


scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+
Run Code Online (Sandbox Code Playgroud)

是否有更简单或开箱即用的方法来解析csv文件(日期和时间戳类型都包含在火花数据框中?

相关链接:
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

Jad*_*ins 5

With a infer option for non-trivial cases it will probably not return the expected result. As you can see in InferSchema.scala:

if (field == null || field.isEmpty || field == nullValue) {
  typeSoFar
} else {
  typeSoFar match {
    case NullType => tryParseInteger(field)
    case IntegerType => tryParseInteger(field)
    case LongType => tryParseLong(field)
    case DoubleType => tryParseDouble(field)
    case TimestampType => tryParseTimestamp(field)
    case BooleanType => tryParseBoolean(field)
    case StringType => StringType
    case other: DataType =>
      throw new UnsupportedOperationException(s"Unexpected data type $other")
Run Code Online (Sandbox Code Playgroud)

It will only try to match each column with a timestamp type, not a date type, so the "out of the box solution" for this case is not possible. But with my experience the "easier" solution, is directly define the schema with the needed type, it will avoid the infer option set a type that only matches for the RDD evaluated not the entire data. Your final schema is an efficient solution.


Car*_*des 5

它不是很优雅,但您可以像这样从时间戳转换为日期(检查最后一行):

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")
    .withColumn("C4", expr("""to_date(C4)"""))
Run Code Online (Sandbox Code Playgroud)