如何使用 PySpark 读取文本文件并应用架构?

Dav*_*SFT 4 python apache-spark pyspark

.txt 文件如下所示:

1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
Run Code Online (Sandbox Code Playgroud)

当我读入它并分类为 3 个不同的列时,我返回这个(完美):

1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
Run Code Online (Sandbox Code Playgroud)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
Run Code Online (Sandbox Code Playgroud)

但是,如果我再次阅读它并应用模式......

df = spark.read.option("header"     , "false")\
               .option("inferSchema", "true" )\
               .text( "fixed-width-2.txt"    )

sorted_df = df.select(
    df.value.substr(1, 4).alias('col1'),
    df.value.substr(5, 4).alias('col2'),
    df.value.substr(8, 4).alias('col3'),
).show()
Run Code Online (Sandbox Code Playgroud)

文件中的数据消失了:


+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
Run Code Online (Sandbox Code Playgroud)

所以我的问题是,如何读取此文本文件并应用架构?

Shu*_*Shu 7

当使用模式读取时,col1因为int该值超过了1234567813572468max int 值。而是用 来阅读LongType

schema = StructType([StructField('col1', LongType(), True)])
spark.read.csv("path",schema=schema).show()
#+----------------+
#|            col1|
#+----------------+
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#+----------------+
Run Code Online (Sandbox Code Playgroud)

Using RDD Api:

更简单的方法是使用.textFile(结果为 rdd)读取固定宽度文件,然后使用应用转换,然后使用模式.map转换为数据帧。


from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").\
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)

df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#+----+----+----+

df.printSchema()
#root
# |-- col1: integer (nullable = true)
# |-- col2: integer (nullable = true)
# |-- col3: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

Using DataFrame Api:

df = spark.read.option("header"     , "false")\
               .option("inferSchema", "true" )\
               .text( "path")

sorted_df = df.select(
    df.value.substr(1, 4).alias('col1'),
    df.value.substr(5, 4).alias('col2'),
    df.value.substr(8, 4).alias('col3'),
)
#dynamic cast expression
casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)

#required dataframe
sorted_df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+

#just in case if you want to change the types
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])

df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
Run Code Online (Sandbox Code Playgroud)

  • 这正是我一直在寻找的东西!也感谢您如此彻底,并提供了第二种方法,因为它帮助我理解了解决此问题的多种方法。对于其他人,我在 Jupyter 笔记本中运行,并发现 PySpark 中未找到“col”,如上面示例中所使用的。为了解决这个问题,我使用了这篇文章:/sf/ask/2811417451/ (2认同)