Dav*_*SFT 4 python apache-spark pyspark
.txt 文件如下所示:
1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
Run Code Online (Sandbox Code Playgroud)
当我读入它并分类为 3 个不同的列时,我返回这个(完美):
1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
Run Code Online (Sandbox Code Playgroud)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
Run Code Online (Sandbox Code Playgroud)
但是,如果我再次阅读它并应用模式......
df = spark.read.option("header" , "false")\
.option("inferSchema", "true" )\
.text( "fixed-width-2.txt" )
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
).show()
Run Code Online (Sandbox Code Playgroud)
文件中的数据消失了:
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,如何读取此文本文件并应用架构?
当使用模式读取时,col1因为int该值超过了1234567813572468max int 值。而是用 来阅读LongType。
schema = StructType([StructField('col1', LongType(), True)])
spark.read.csv("path",schema=schema).show()
#+----------------+
#| col1|
#+----------------+
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#|1234567813572468|
#+----------------+
Run Code Online (Sandbox Code Playgroud)
Using RDD Api:
更简单的方法是使用.textFile(结果为 rdd)读取固定宽度文件,然后使用应用转换,然后使用模式.map转换为数据帧。
from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").\
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#|1234|5678|1357|
#+----+----+----+
df.printSchema()
#root
# |-- col1: integer (nullable = true)
# |-- col2: integer (nullable = true)
# |-- col3: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
Using DataFrame Api:
df = spark.read.option("header" , "false")\
.option("inferSchema", "true" )\
.text( "path")
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
)
#dynamic cast expression
casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)
#required dataframe
sorted_df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
#just in case if you want to change the types
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#|1234|5678|8135|
#+----+----+----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15216 次 |
| 最近记录: |