如何在CSV中使用双管作为分隔符?

SFa*_*ima 6 scala apache-spark

Spark 1.5和Scala 2.10.6

我有一个使用"||"作为分隔符的数据文件.我很难解析这个以创建数据框.可以使用多个分隔符来创建数据框吗?该代码适用于单个损坏的管道,但不适用于多个分隔符.

我的代码:

val customSchema_1 = StructType(Array(
    StructField("ID", StringType, true), 
    StructField("FILLER", StringType, true), 
    StructField("CODE", StringType, true)));

val df_1 = sqlContext.read
    .format("com.databricks.spark.csv")
    .schema(customSchema_1)
    .option("delimiter", "¦¦")
    .load("example.txt")
Run Code Online (Sandbox Code Playgroud)

样本文件:

12345¦¦  ¦¦10
Run Code Online (Sandbox Code Playgroud)

loc*_*obr 6

我遇到了这个问题并找到了一个很好的解决方案,我使用的是 spark 2.3,我觉得它应该适用于所有 spark 2.2+,但尚未对其进行测试。它的工作方式是我||用 a替换tab,然后内置的 csv 可以采用Dataset[String]。我使用制表符是因为我的数据中有逗号。

var df = spark.sqlContext.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("delimiter", "\t")
  .csv(spark.sqlContext.read.textFile("filename")
      .map(line => line.split("\\|\\|").mkString("\t")))
Run Code Online (Sandbox Code Playgroud)

希望这对其他人有帮助。

编辑:

从 spark 3.0.1 开始,这是开箱即用的。

例子:

val ds = List("name||id", "foo||12", "brian||34", """"cray||name"||123""", "cray||name||123").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

val csv = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", "||").csv(ds)
csv: org.apache.spark.sql.DataFrame = [name: string, id: string]

csv.show
+----------+----+
|      name|  id|
+----------+----+
|       foo|  12|
|     brian|  34|
|cray||name| 123|
|      cray|name|
+----------+----+
Run Code Online (Sandbox Code Playgroud)


eva*_*man 5

所以这里发出的实际错误是:

java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ¦¦
Run Code Online (Sandbox Code Playgroud)

文档证实了这个限制,我检查了 Spark 2.0 csv 阅读器,它有相同的要求。

鉴于所有这些,如果您的数据足够简单,您不会有包含 的条目¦¦,我会像这样加载您的数据:

scala> :pa
// Entering paste mode (ctrl-D to finish)
val customSchema_1 = StructType(Array(
    StructField("ID", StringType, true), 
    StructField("FILLER", StringType, true), 
    StructField("CODE", StringType, true)));

// Exiting paste mode, now interpreting.
customSchema_1: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,true), StructField(FILLER,StringType,true), StructField(CODE,StringType,true))

scala> val rawData = sc.textFile("example.txt")
rawData: org.apache.spark.rdd.RDD[String] = example.txt MapPartitionsRDD[1] at textFile at <console>:31

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val rowRDD = rawData.map(line => Row.fromSeq(line.split("¦¦")))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:34

scala> val df = sqlContext.createDataFrame(rowRDD, customSchema_1)
df: org.apache.spark.sql.DataFrame = [ID: string, FILLER: string, CODE: string]

scala> df.show
+-----+------+----+
|   ID|FILLER|CODE|
+-----+------+----+
|12345|      |  10|
+-----+------+----+
Run Code Online (Sandbox Code Playgroud)