Mpi*_*ris 6 scala apache-spark
我有一个数据框,其中包含一些多行观察结果:
+--------------------+----------------+
| col1| col2|
+--------------------+----------------+
|something1 |somethingelse1 |
|something2 |somethingelse2 |
|something3 |somethingelse3 |
|something4 |somethingelse4 |
|multiline
row | somethings|
|something |somethingall |
Run Code Online (Sandbox Code Playgroud)
我想要的是以该数据帧的csv格式(或txt)保存。使用以下内容:
df
.write
.format("csv")
.save("s3://../adf/")
Run Code Online (Sandbox Code Playgroud)
但是,当我检查文件时,它将观察结果分成多行。我想要的是在txt / csv文件中具有“多行”观测值的行是同一行。我试图将其另存为txt文件:
df
.as[(String,String)]
.rdd
.saveAsTextFile("s3://../adf")
Run Code Online (Sandbox Code Playgroud)
但观察到相同的输出。
我可以想象一种方法是\n用其他东西代替,然后在装回时执行反向功能。但是,有没有一种方法可以按期望的方式保存它,而无需对数据进行任何形式的转换?
假设正确引用了多行数据,则可以使用univocity解析器和multiLine设置来解析多行csv数据
sparkSession.read
.option("parserLib", "univocity")
.option("multiLine", "true")
.csv(file)
Run Code Online (Sandbox Code Playgroud)
请注意,这需要将整个文件作为单个执行程序读取,并且如果您的数据太大,则可能无法正常工作。标准文本文件读取将在执行任何其他解析之前按行将文件拆分,这将阻止您使用包含换行符的数据记录,除非可以使用其他记录定界符。如果不是,则可能需要实现自定义TextInputFormat来处理多行记录。
默认情况下,spark saveTextFile 如果遇到 \n 会考虑不同的行。这与 csv 相同。在 csv 读取中,您可以使用 option("delimiter", "\t") 指定分隔符。
在我看来,读取多行输入的最佳方法是通过 hadoopAPI。您可以指定自己的分隔符并处理数据。
像这样的东西:
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "<your delimiter>")
val data: RDD[(LongWritable, Text)] =spark.sparkContext.newAPIHadoopFile(<"filepath">, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
Run Code Online (Sandbox Code Playgroud)
这里的数据文本是分隔符分隔的字符串
| 归档时间: |
|
| 查看次数: |
6786 次 |
| 最近记录: |