Rak*_*van 10 apache-spark pyspark spark-dataframe pyspark-sql
我有一个以这种方式构建的CSV文件:
Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
Run Code Online (Sandbox Code Playgroud)
我在阅读此文件时遇到两个问题.
这是我尝试过的:
df = sc.textFile("myFile.csv")\
.map(lambda line: line.split(","))\ #Split By comma
.filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
Run Code Online (Sandbox Code Playgroud)
但是,这不起作用,因为值中的逗号被读作分隔符而len(line)返回4而不是2.
我尝试了另一种方法:
data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
Run Code Online (Sandbox Code Playgroud)
我的想法是使用过滤器而不是读取标题.但是,当我尝试打印标题时,我得到了编码值.
[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
Run Code Online (Sandbox Code Playgroud)
读取CSV文件并跳过前两行的正确方法是什么?
小智 7
尝试使用带有'quotechar'参数的csv.reader.它会正确分割行.之后,您可以根据需要添加过滤器.
import csv
from pyspark.sql.types import StringType
df = sc.textFile("test2.csv")\
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"')).filter(lambda line: len(line)>=2 and line[0]!= 'Col1')\
.toDF(['Col1','Col2'])
Run Code Online (Sandbox Code Playgroud)
对于您的第一个问题,只需在RDD中压缩行zipWithIndex并过滤您不想要的行.对于第二个问题,您可以尝试从行中删除第一个和最后一个双引号字符,然后拆分该行",".
rdd = sc.textFile("myfile.csv")
rdd.zipWithIndex().
filter(lambda x: x[1] > 2).
map(lambda x: x[0]).
map(lambda x: x.strip('"').split('","')).
toDF(["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)
虽然,如果您正在寻找一种在Spark中处理CSV文件的标准方法,最好使用spark-csvdatabricks中的软件包.
Zlidime 的回答有正确的想法。工作解决方案是这样的:
import csv
customSchema = StructType([ \
StructField("Col1", StringType(), True), \
StructField("Col2", StringType(), True)])
df = sc.textFile("file.csv")\
.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\
.toDF(customSchema)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14964 次 |
| 最近记录: |