替换 csv 文件中的新行 (\n) 字符 - spark scala

use*_*220 6 replace scala newline character apache-spark

只是为了说明问题,我采用了一个测试集 csv 文件。但在实际情况下,问题必须处理的不仅仅是一个 TeraByte 数据。

我有一个 CSV 文件,其中的列用引号(“col1”)括起来。但是当数据导入完成时。一列包含换行符 (\n)。这导致我遇到很多问题,当我想将它们保存为 Hive 表时。

我的想法是用“|”替换\n字符 管火花。

到目前为止我实现了:

1. val test = sqlContext.load(
        "com.databricks.spark.csv",
        Map("path" -> "test_set.csv", "header" -> "true", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\\" ,"parserLib" -> "univocity" ))#read a csv file

 2.   val dataframe = test.toDF() #convert to dataframe

  3.    dataframe.foreach(println) #print

    4. dataframe.map(row => {
        val row4 = row.getAs[String](4)
        val make = row4.replaceAll("[\r\n]", "|") 
        (make)
      }).collect().foreach(println) #replace not working for me
Run Code Online (Sandbox Code Playgroud)

样本集:

(17 , D73 ,525, 1  ,testing\n    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once \n again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3\n      ,  10 ,20.07.2011 ,null ,F10 , R)
Run Code Online (Sandbox Code Playgroud)

预期结果集:

(17 , D73 ,525, 1  ,testing|    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once | again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3|      ,  10 ,20.07.2011 ,null ,F10 , R)
Run Code Online (Sandbox Code Playgroud)

什么对我有用:

val rep = "\n123\n Main Street\n".replaceAll("[\\r\\n]", "|") rep: String = |123| Main Street|
Run Code Online (Sandbox Code Playgroud)

但为什么我不能在元组的基础上做?

 val dataRDD = lines_wo_header.map(line => line.split(";")).map(row => (row(0).toLong, row(1).toString, 
                                               row(2).toLong, row(3).toLong, 
                                               row(4).toString, row(5).toLong,
                                               row(6).toString, row(7).toString, row(8).toString,row(9).toString)) 

dataRDD.map(row => {
                val wert = row._5.replaceAll("[\\r\\n]", "|") 
                (row._1,row._2,row._3,row._4,wert,row._6, row._7,row._8,row._9,row._10)
                }).collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)

Spark --version 1.3.1

Dan*_*ula 8

如果您可以使用 Spark SQL 1.5 或更高版本,您可以考虑使用可用于列的函数。假设您不知道(或没有)列的名称,您可以按照以下代码段进行操作:

val df = test.toDF()

import org.apache.spark.sql.functions._
val newDF = df.withColumn(df.columns(4), regexp_replace(col(df.columns(4)), "[\\r\\n]", "|"))
Run Code Online (Sandbox Code Playgroud)

如果您知道列的名称,则可以df.columns(4)在两次出现时用其名称替换。

我希望这有帮助。干杯。