m v*_*m v 7 python dataframe pandas apache-spark pyspark
主要目标
显示或选择从 parquet 文件读取的 Spark 数据帧中的列。论坛中提到的所有解决方案在我们的案例中均不成功。
问题
当使用 SPARK 读取和查询 parquet 文件时,会出现此问题,原因是 ,;{}()\n\t=列名称中存在特殊字符。使用具有两列和五行的简单镶木地板文件重现了该问题。列的名称是:
SpeedReference_Final_01 (RifVel_G0)SpeedReference_Final_02 (RifVel_G1)出现的错误是:
Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
我们使用Python语言的PySpark,实验的解决方案可以分类如下:
基于列重命名的解决方案- [ spark.read.parquet+获得的数据框的重命名]
已经试验了几种解决方案:
withColumnRenamed(脚本中的问题 N.2)toDF(第 N.3 期)alias(第 N.5 期)在我们的例子中,它们都不起作用。
将 parquet 文件读入 Pandas 数据帧,然后从中创建一个新文件- [ pd.read.parquet+ spark.createDataFrame]
此解决方案正在使用一个小型 parquet 文件(问题 N.0,即脚本内的解决方案):创建的 Spark 数据帧甚至可以成功查询如果它的列名包含特殊字符。不幸的是,对于我们的大镶木地板文件(每个镶木地板 600000 行 x 1000 列)来说这是不切实际的,因为创建 Spark 数据帧是无休止的。
尝试将 parquet 文件读入 Spark 数据帧并使用其rdd重命名的模式创建新的 Spark 数据帧是不切实际的,因为从 Spark 数据帧中提取rdd会出现相同的错误(问题 N.4)。
使用前缀架构(避免特殊字符)读取 parquet 文件spark.read.schema(...).parquet- [ ]
该解决方案不起作用,因为与关键列相关的数据按预期变为 null/None,因为原始文件中不存在重命名的列。
上述解决方案总结在下面的 python 代码中,并已使用示例 parquet 文件进行了实验。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pandas as pd
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# Select file
filename = 'D:/Simple.parquet'
issue_num = 0 # Workaround to issues (Equivalent to no issue)
#issue_num = 1 # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
#issue_num = 2 # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
#issue_num = 3 # Issue 3 - Unable to show dataframe or select column after rename (using toDF)
#issue_num = 4 # Issue 4 - Unable to extract rdd from renamed dataframe
#issue_num = 5 # Issue 5 - Unable to select column with alias
if issue_num == 0:
################################################################################################
# WORKAROUND - Create Spark data frame from Pandas dataframe
df_pd = pd.read_parquet(filename)
DF = spark.createDataFrame(df_pd)
print('WORKAROUND')
DF.show()
# +-----------------------------------+-----------------------------------+
# |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
# +-----------------------------------+-----------------------------------+
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# +-----------------------------------+-----------------------------------+
################################################################################################
# Correct management of columns with invalid characters when using spark.createDataFrame
# spark.createDataFrame: Create a dataframe with two columns with invalid characters - OK
# DFCREATED
schema = StructType(
[
StructField("SpeedReference_Final_01 (RifVel_G0)", FloatType(), nullable=True),
StructField("SpeedReference_Final_02 (RifVel_G1)", FloatType(), nullable=True)
]
)
row_in = [(553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372)]
rdd=spark.sparkContext.parallelize(row_in)
DFCREATED = spark.createDataFrame(rdd, schema)
DFCREATED.show()
# +-----------------------------------+-----------------------------------+
# |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
# +-----------------------------------+-----------------------------------+
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# +-----------------------------------+-----------------------------------+
DF_SEL_VAR_CREATED = DFCREATED.select(DFCREATED.columns[0]).take(2)
for el in DF_SEL_VAR_CREATED:
print(el)
#Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
#Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
else:
# spark.read: read file into dataframe - OK
DF = spark.read.parquet(filename)
print('ORIGINAL SCHEMA')
DF.printSchema()
# root
# |-- SpeedReference_Final_01 (RifVel_G0): float (nullable = true)
# |-- SpeedReference_Final_02 (RifVel_G1): float (nullable = true)
if issue_num == 1:
###############################################################################################
# Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
DF.show()
# DF.select(DF.columns[0]).show()
# DF_SEL_VAR = DF.select(DF.columns[0]).take(3)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 3 previous statements
elif issue_num == 2:
###############################################################################################
# Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
print('RENAMED SCHEMA')
DFRENAMED.printSchema()
# root
# |-- RifVelG0: float (nullable = true)
# |-- RifVelG1: float (nullable = true)
DFRENAMED.show()
# DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 2 previous statements
elif issue_num == 3:
###############################################################################################
# Issue 3 - Unable to show dataframe or select column after rename (using to_DF)
DFRENAMED = DF.toDF('RifVelG0', 'RifVelG1')
print('RENAMED SCHEMA')
DFRENAMED.printSchema()
# root
# |-- RifVelG0: float (nullable = true)
# |-- RifVelG1: float (nullable = true)
DFRENAMED.show()
# DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 2 previous statements
elif issue_num == 4:
###############################################################################################
# Issue 4 - Unable to extract rdd from renamed dataframe
DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
DFRENAMED_rdd = DFRENAMED.rdd
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
elif issue_num == 5:
###############################################################################################
# Issue 5 - Unable to select column with alias
DF_SEL_VAR = DF.select(col(DF.columns[0]).alias('RifVelG0')).take(3)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
Run Code Online (Sandbox Code Playgroud)
您知道我们如何解决这个问题吗?
任何建议都非常感激。
小智 0
尝试这样的事情:
import re
import pyspark.sql.functions as f
def remove_special_characters(string: str):
return re.sub("[^a-zA-Z0-9 ]", "", string)
DFCREATED = DFCREATED.select(
[
f.col(column).alias(remove_special_characters(column))
for column in DFCREATED.columns
]
)
Run Code Online (Sandbox Code Playgroud)
我也认为你可以使用这个功能来删除其他东西,比如空间。
| 归档时间: |
|
| 查看次数: |
5549 次 |
| 最近记录: |