SFa*_*ima 6 scala apache-spark
Spark 1.5和Scala 2.10.6
我有一个使用"||"作为分隔符的数据文件.我很难解析这个以创建数据框.可以使用多个分隔符来创建数据框吗?该代码适用于单个损坏的管道,但不适用于多个分隔符.
我的代码:
val customSchema_1 = StructType(Array(
StructField("ID", StringType, true),
StructField("FILLER", StringType, true),
StructField("CODE", StringType, true)));
val df_1 = sqlContext.read
.format("com.databricks.spark.csv")
.schema(customSchema_1)
.option("delimiter", "¦¦")
.load("example.txt")
Run Code Online (Sandbox Code Playgroud)
样本文件:
12345¦¦ ¦¦10
Run Code Online (Sandbox Code Playgroud)
我遇到了这个问题并找到了一个很好的解决方案,我使用的是 spark 2.3,我觉得它应该适用于所有 spark 2.2+,但尚未对其进行测试。它的工作方式是我||用 a替换tab,然后内置的 csv 可以采用Dataset[String]。我使用制表符是因为我的数据中有逗号。
var df = spark.sqlContext.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "\t")
.csv(spark.sqlContext.read.textFile("filename")
.map(line => line.split("\\|\\|").mkString("\t")))
Run Code Online (Sandbox Code Playgroud)
希望这对其他人有帮助。
编辑:
从 spark 3.0.1 开始,这是开箱即用的。
例子:
val ds = List("name||id", "foo||12", "brian||34", """"cray||name"||123""", "cray||name||123").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
val csv = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", "||").csv(ds)
csv: org.apache.spark.sql.DataFrame = [name: string, id: string]
csv.show
+----------+----+
| name| id|
+----------+----+
| foo| 12|
| brian| 34|
|cray||name| 123|
| cray|name|
+----------+----+
Run Code Online (Sandbox Code Playgroud)
所以这里发出的实际错误是:
java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ¦¦
Run Code Online (Sandbox Code Playgroud)
文档证实了这个限制,我检查了 Spark 2.0 csv 阅读器,它有相同的要求。
鉴于所有这些,如果您的数据足够简单,您不会有包含 的条目¦¦,我会像这样加载您的数据:
scala> :pa
// Entering paste mode (ctrl-D to finish)
val customSchema_1 = StructType(Array(
StructField("ID", StringType, true),
StructField("FILLER", StringType, true),
StructField("CODE", StringType, true)));
// Exiting paste mode, now interpreting.
customSchema_1: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,true), StructField(FILLER,StringType,true), StructField(CODE,StringType,true))
scala> val rawData = sc.textFile("example.txt")
rawData: org.apache.spark.rdd.RDD[String] = example.txt MapPartitionsRDD[1] at textFile at <console>:31
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val rowRDD = rawData.map(line => Row.fromSeq(line.split("¦¦")))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:34
scala> val df = sqlContext.createDataFrame(rowRDD, customSchema_1)
df: org.apache.spark.sql.DataFrame = [ID: string, FILLER: string, CODE: string]
scala> df.show
+-----+------+----+
| ID|FILLER|CODE|
+-----+------+----+
|12345| | 10|
+-----+------+----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4469 次 |
| 最近记录: |