我有一个 CSV 文件,我正在尝试使用Spark CSV 包加载它,但它没有正确加载数据,因为其中很少有字段\n,例如以下两行
"XYZ", "Test Data", "TestNew\nline", "OtherData"
"XYZ", "Test Data", "blablablabla
\nblablablablablalbal", "OtherData"
Run Code Online (Sandbox Code Playgroud)
我使用下面的代码这是我使用直截了当parserLib的univocity在网上阅读它解决了多个问题,换行,但它似乎并不适合我的情况。
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("parserLib","univocity")
.load("data.csv");
Run Code Online (Sandbox Code Playgroud)
如何在以引号开头的字段中替换换行符。有没有更简单的方法?
scala apache-spark apache-spark-sql spark-csv apache-spark-1.6
这就是我在 Spark 数据框中加载 csv 文件的方式
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))
Run Code Online (Sandbox Code Playgroud)
这是我的输入文件名之一的示例。
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt
Run Code Online (Sandbox Code Playgroud)
现在我想读取这个文件并用“.”分割它。运算符,然后添加 CUS 作为新列来代替 DataPartition 。
我可以在没有任何 UDF 的情况下做到这一点吗?
这是现有数据框的架构
root
|-- LineItem_organizationId: long (nullable = true)
|-- …Run Code Online (Sandbox Code Playgroud) 我正在尝试将CSV文件转换为镶木地板,并且我正在使用Spark来完成此操作。
SparkSession spark = SparkSession
.builder()
.appName(appName)
.config("spark.master", master)
.getOrCreate();
Dataset<Row> logFile = spark.read().csv("log_file.csv");
logFile.write().parquet("log_file.parquet");
Run Code Online (Sandbox Code Playgroud)
现在的问题是我没有定义架构,列看起来像这样(输出在spark中使用printSchema()显示)
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
....
Run Code Online (Sandbox Code Playgroud)
csv在第一行有名称,但是我想它们被忽略了,问题是只有几列是字符串,我也有整数和日期。
我只使用Spark,基本上没有Avro或其他任何功能(从未使用过Avro)。
我定义模式有哪些选择?如何选择?如果我需要用其他方式编写镶木地板文件,那么只要它是一种快速简便的解决方案,就没有问题。
(我正在使用Spark Standalone进行测试/不知道Scala)
我有一个 CSV 文件,其中最后一列位于括号内,并且值以逗号分隔。最后一列中值的数量是可变的。当我将它们读为带有一些列名称的 Dataframe 时,如下所示,我得到了Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. 我的 CSV 文件如下所示
a1,b1,true,2017-05-16T07:00:41.0000000,2.5,(c1,d1,e1)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,f2,g2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,k2,f2)
Run Code Online (Sandbox Code Playgroud)
我最终想要的是这样的:
root
|-- MId: string (nullable = true)
|-- PId: string (nullable = true)
|-- IsTeacher: boolean(nullable = true)
|-- STime: datetype(nullable = true)
|-- TotalMinutes: double(nullable = true)
|-- SomeArrayHeader: array<string>(nullable = true)
Run Code Online (Sandbox Code Playgroud)
到目前为止我已经编写了以下代码:
val infoDF =
sqlContext.read.format("csv")
.option("header", "false")
.load(inputPath)
.toDF(
"MId",
"PId",
"IsTeacher",
"STime",
"TotalMinutes",
"SomeArrayHeader")
Run Code Online (Sandbox Code Playgroud)
我想在不给出列名的情况下阅读它们,然后将第五列之后的列转换为数组类型。但后来我遇到了括号的问题。有没有一种方法可以在阅读并告知括号内的字段实际上是数组类型的一个字段时执行此操作。