如何将csv直接加载到Spark数据集中?

Vas*_*kas 4 scala apache-spark apache-spark-sql

我有一个csv文件[1],我想直接加载到数据集中.问题是我总是得到错误

org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate
The type path of the target object is:
- field (class: "scala.Float", name: "probability")
- root class: "TFPredictionFormat"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
Run Code Online (Sandbox Code Playgroud)

而且,特别是对于该phrases领域(检查案例类[2]),它得到了

org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true);
Run Code Online (Sandbox Code Playgroud)

如果我将我的case类[2]中的所有字段定义为String类型,那么一切正常,但这不是我想要的.有没有一种简单的方法可以做到[3]?


参考

[1]一个示例行

B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781
Run Code Online (Sandbox Code Playgroud)

[2]我的代码片段如下

import spark.implicits._

val INPUT_TF = "<SOME_URI>/my_file.csv"

final case class TFFormat (
    doc_id: String,
    brand: String,
    phrases: Seq[String],
    prediction: String,
    probability: Float
)

val ds = sqlContext.read
.option("header", "true")
.option("charset", "UTF8")
.csv(INPUT_TF)
.as[TFFormat]

ds.take(1).map(println)
Run Code Online (Sandbox Code Playgroud)

[3]我已经找到了方法,首先在DataFrame级别定义列并将事物转换为数据集(如此此处此处),但我几乎可以肯定这不是应该完成的事情.我也很确定编码器可能是答案,但我不知道如何

use*_*411 6

TL; DR使用csv标准DataFrame操作进行输入转换是可行的方法.如果你想避免你应该使用具有表现力的输入格式(Parquet甚至是JSON).

通常,要转换为静态类型数据集的数据必须已经是正确的类型.最有效的方法是schemacsv读者提供论据:

val schema: StructType = ???
val ds = spark.read
  .option("header", "true")
  .schema(schema)
  .csv(path)
  .as[T]
Run Code Online (Sandbox Code Playgroud)

哪里schema可以通过反思来推断:

import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.types.StructType

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
Run Code Online (Sandbox Code Playgroud)

不幸的是,它不适用于您的数据和类,因为csv读者不支持ArrayType(但它适用于原子类型FloatType)因此您必须使用困难的方法.一个天真的解决方案可以表达如下:

import org.apache.spark.sql.functions._

val df: DataFrame = ???  // Raw data

df
  .withColumn("probability", $"probability".cast("float"))
  .withColumn("phrases",
    split(regexp_replace($"phrases", "[\\['\\]]", ""), ","))
  .as[TFFormat]
Run Code Online (Sandbox Code Playgroud)

但是根据内容的不同,你可能需要更复杂的东西phrases.

  • 谢谢!只需添加一个角度:也可以使用编码器推断模式,如:`Encoders.product [TFFormat] .schema` (3认同)