如何获取 PySpark DataFrame 的引用列?

jam*_*iet 6 python apache-spark pyspark

给定PySpark DataFrame是否可以获得 DataFrame 引用的源列的列表?

也许一个更具体的例子可能有助于解释我所追求的。假设我有一个 DataFrame 定义为:

import pyspark.sql.functions as func
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
source_df = spark.createDataFrame(
    [("pru", 23, "finance"), ("paul", 26, "HR"), ("noel", 20, "HR")],
    ["name", "age", "department"],
)
source_df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT name, age, department FROM people")
df = sqlDF.groupBy("department").agg(func.max("age").alias("max_age"))
df.show()
Run Code Online (Sandbox Code Playgroud)

返回:

+----------+--------+                                                           
|department|max_age |
+----------+--------+
|   finance|      23|
|        HR|      26|
+----------+--------+
Run Code Online (Sandbox Code Playgroud)

引用的列df[department, age]。是否可以通过编程方式获取引用列的列表?

感谢在 pyspark 中捕获explain()的结果,我知道我可以将计划提取为字符串:

+----------+--------+                                                           
|department|max_age |
+----------+--------+
|   finance|      23|
|        HR|      26|
+----------+--------+
Run Code Online (Sandbox Code Playgroud)

返回:

== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
   +- Exchange (4)
      +- HashAggregate (3)
         +- Project (2)
            +- Scan ExistingRDD (1)


(1) Scan ExistingRDD
Output [3]: [name#0, age#1L, department#2]
Arguments: [name#0, age#1L, department#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Project
Output [2]: [age#1L, department#2]
Input [3]: [name#0, age#1L, department#2]

(3) HashAggregate
Input [2]: [age#1L, department#2]
Keys [1]: [department#2]
Functions [1]: [partial_max(age#1L)]
Aggregate Attributes [1]: [max#22L]
Results [2]: [department#2, max#23L]

(4) Exchange
Input [2]: [department#2, max#23L]
Arguments: hashpartitioning(department#2, 200), ENSURE_REQUIREMENTS, [plan_id=60]

(5) HashAggregate
Input [2]: [department#2, max#23L]
Keys [1]: [department#2]
Functions [1]: [max(age#1L)]
Aggregate Attributes [1]: [max(age#1L)#12L]
Results [2]: [department#2, max(age#1L)#12L AS max_age#13L]

(6) AdaptiveSparkPlan
Output [2]: [department#2, max_age#13L]
Arguments: isFinalPlan=false
Run Code Online (Sandbox Code Playgroud)

这很有用,但不是我需要的。我需要引用列的列表。这可能吗?

也许提出问题的另一种方式是......有没有办法获得解释计划作为我可以迭代/探索的对象?


更新。感谢 @matt-andruff 的回复,我得到了这个:

df._sc._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), "formatted")
Run Code Online (Sandbox Code Playgroud)

返回:

== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
   +- Exchange (4)
      +- HashAggregate (3)
         +- Project (2)
            +- Scan ExistingRDD (1)


(1) Scan ExistingRDD
Output [3]: [name#0, age#1L, department#2]
Arguments: [name#0, age#1L, department#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Project
Output [2]: [age#1L, department#2]
Input [3]: [name#0, age#1L, department#2]

(3) HashAggregate
Input [2]: [age#1L, department#2]
Keys [1]: [department#2]
Functions [1]: [partial_max(age#1L)]
Aggregate Attributes [1]: [max#22L]
Results [2]: [department#2, max#23L]

(4) Exchange
Input [2]: [department#2, max#23L]
Arguments: hashpartitioning(department#2, 200), ENSURE_REQUIREMENTS, [plan_id=60]

(5) HashAggregate
Input [2]: [department#2, max#23L]
Keys [1]: [department#2]
Functions [1]: [max(age#1L)]
Aggregate Attributes [1]: [max(age#1L)#12L]
Results [2]: [department#2, max(age#1L)#12L AS max_age#13L]

(6) AdaptiveSparkPlan
Output [2]: [department#2, max_age#13L]
Arguments: isFinalPlan=false
Run Code Online (Sandbox Code Playgroud)

我想我可以从中解析我想要的信息,但这远非优雅的方法,而且特别容易出错。

我真正追求的是一种故障安全、可靠、API 支持的方式来获取此信息。我开始认为这是不可能的。

小智 -1

您可以尝试以下代码,这将为您提供数据框中的列列表及其数据类型。

for field in df.schema.fields:
    print(field.name +" , "+str(field.dataType))
Run Code Online (Sandbox Code Playgroud)