2 python apache-spark-sql pyspark
我在 pysparkdf
和data
. 架构如下所示
>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- Date: timestamp (nullable = false)
|-- ZipCode: integer (nullable = true)
|-- car: string (nullable = true)
|-- van: string (nullable = true)
>>> data.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- date: string (nullable = true)
|-- zipcode: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我想data
通过比较两个架构来将 car 和 van 列添加到我的数据框中。
如果列相同,我还想比较两个数据框,但如果列不同,则将列添加到没有列的数据框中。
我们如何在 pyspark 中实现这一点?
仅供参考,我正在使用 Spark 1.6
将列添加到数据框中后。新添加的数据框中这些列的值应该为空。
例如,这里我们向
data
数据框中添加列,因此数据数据框中的 car 和 van 列应包含空值,但 df 数据框中的相同列应具有其原始值如果要添加超过 2 个新列,会发生什么情况
由于模式不是由 StructFields 列表组成的 StructType,因此我们可以检索字段列表,以比较并找到丢失的列,
df_schema = df.schema.fields
data_schema = data.schema.fields
df_names = [x.name.lower() for x in df_scehma]
data_names = [x.name.lower() for x in data_schema]
if df_schema <> data_schema:
col_diff = set(df_names) ^ set(data_names)
col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if ((x[0] is not None and x[0].name.lower() in col_diff) or x[1].name.lower() in col_diff)]
for i in col_list:
if i[0] in df_names:
data = data.withColumn("%s"%i[0],lit(None).cast(i[1]))
else:
df = df.withColumn("%s"%i[0],lit(None).cast(i[1]))
else:
print "Nothing to do"
Run Code Online (Sandbox Code Playgroud)
您提到过如果没有空值则添加列,但您的架构差异是可为空的列,因此未使用该检查。如果您需要它,请添加可空检查,如下所示,
col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if (x[0].name.lower() in col_diff or x[1].name.lower() in col_diff) and not x.nullable]
Run Code Online (Sandbox Code Playgroud)
请查看文档以了解有关 StructType 和 StructFields 的更多信息, https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType
归档时间: |
|
查看次数: |
10589 次 |
最近记录: |