如何处理Spark中的多行?

Mpi*_*ris 6 scala apache-spark

我有一个数据框,其中包含一些多行观察结果:

+--------------------+----------------+
|         col1|               col2|
+--------------------+----------------+
|something1           |somethingelse1  |
|something2           |somethingelse2  |
|something3           |somethingelse3  |
|something4           |somethingelse4  |
|multiline

 row               |     somethings|
|something            |somethingall    |
Run Code Online (Sandbox Code Playgroud)

我想要的是以该数据帧的csv格式(或txt)保存。使用以下内容:

df
 .write
 .format("csv")
 .save("s3://../adf/")
Run Code Online (Sandbox Code Playgroud)

但是,当我检查文件时,它将观察结果分成多行。我想要的是在txt / csv文件中具有“多行”观测值的行是同一行。我试图将其另存为txt文件:

df
.as[(String,String)]
.rdd
.saveAsTextFile("s3://../adf")
Run Code Online (Sandbox Code Playgroud)

但观察到相同的输出。

我可以想象一种方法是\n用其他东西代替,然后在装回时执行反向功能。但是,有没有一种方法可以按期望的方式保存它,而无需对数据进行任何形式的转换?

puh*_*len 7

假设正确引用了多行数据,则可以使用univocity解析器和multiLine设置来解析多行csv数据

sparkSession.read
  .option("parserLib", "univocity")
  .option("multiLine", "true")
  .csv(file)
Run Code Online (Sandbox Code Playgroud)

请注意,这需要将整个文件作为单个执行程序读取,并且如果您的数据太大,则可能无法正常工作。标准文本文件读取将在执行任何其他解析之前按行将文件拆分,这将阻止您使用包含换行符的数据记录,除非可以使用其他记录定界符。如果不是,则可能需要实现自定义TextInputFormat来处理多行记录。


Avi*_*rya 2

默认情况下,spark saveTextFile 如果遇到 \n 会考虑不同的行。这与 csv 相同。在 csv 读取中,您可以使用 option("delimiter", "\t") 指定分隔符。

在我看来,读取多行输入的最佳方法是通过 hadoopAPI。您可以指定自己的分隔符并处理数据。

像这样的东西:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val conf = new Configuration
conf.set("textinputformat.record.delimiter", "<your delimiter>")
val data: RDD[(LongWritable, Text)] =spark.sparkContext.newAPIHadoopFile(<"filepath">, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
Run Code Online (Sandbox Code Playgroud)

这里的数据文本是分隔符分隔的字符串