将管道分隔文件转换为Spark DataFrame转换为CSV文件

use*_*466 1 csv scala apache-spark spark-streaming rdd

我有一个只有一栏的CSV文件,行定义如下:

123 || food || fruit
123 || food || fruit || orange 
123 || food || fruit || apple
Run Code Online (Sandbox Code Playgroud)

我想用一个单列和不同的行值创建一个csv文件:

orange
apple
Run Code Online (Sandbox Code Playgroud)

我尝试使用以下代码:

 val data = sc.textFile("fruits.csv")
 val rows = data.map(_.split("||"))
 val rddnew = rows.flatMap( arr => {
 val text = arr(0) 
 val words = text.split("||")
 words.map( word => ( word, text ) )
 } )
Run Code Online (Sandbox Code Playgroud)

但是这段代码并没有给我想要的正确结果。
有人可以帮我吗?

Ash*_*ish 6

您需要使用转义符拆分特殊字符,因为拆分需要使用正则表达式

.split("\\|\\|")
Run Code Online (Sandbox Code Playgroud)

转换为CSV非常棘手,因为数据字符串可能包含定界符(用引号引起来),换行符或其他对解析敏感的字符,因此我建议使用spark-csv

 val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", "||")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("words.csv")
Run Code Online (Sandbox Code Playgroud)

 words.write
  .format("com.databricks.spark.csv")
  .option("delimiter", "||")
  .option("header", "true")
  .save("words.csv")
Run Code Online (Sandbox Code Playgroud)