jam*_*iet 6 python apache-spark pyspark
给定PySpark DataFrame是否可以获得 DataFrame 引用的源列的列表?
也许一个更具体的例子可能有助于解释我所追求的。假设我有一个 DataFrame 定义为:
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
source_df = spark.createDataFrame(
[("pru", 23, "finance"), ("paul", 26, "HR"), ("noel", 20, "HR")],
["name", "age", "department"],
)
source_df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT name, age, department FROM people")
df = sqlDF.groupBy("department").agg(func.max("age").alias("max_age"))
df.show()
Run Code Online (Sandbox Code Playgroud)
返回:
+----------+--------+
|department|max_age |
+----------+--------+
| finance| 23|
| HR| 26|
+----------+--------+
Run Code Online (Sandbox Code Playgroud)
引用的列df是[department, age]。是否可以通过编程方式获取引用列的列表?
感谢在 pyspark 中捕获explain()的结果,我知道我可以将计划提取为字符串:
+----------+--------+
|department|max_age |
+----------+--------+
| finance| 23|
| HR| 26|
+----------+--------+
Run Code Online (Sandbox Code Playgroud)
返回:
== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
+- Exchange (4)
+- HashAggregate (3)
+- Project (2)
+- Scan ExistingRDD (1)
(1) Scan ExistingRDD
Output [3]: [name#0, age#1L, department#2]
Arguments: [name#0, age#1L, department#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Project
Output [2]: [age#1L, department#2]
Input [3]: [name#0, age#1L, department#2]
(3) HashAggregate
Input [2]: [age#1L, department#2]
Keys [1]: [department#2]
Functions [1]: [partial_max(age#1L)]
Aggregate Attributes [1]: [max#22L]
Results [2]: [department#2, max#23L]
(4) Exchange
Input [2]: [department#2, max#23L]
Arguments: hashpartitioning(department#2, 200), ENSURE_REQUIREMENTS, [plan_id=60]
(5) HashAggregate
Input [2]: [department#2, max#23L]
Keys [1]: [department#2]
Functions [1]: [max(age#1L)]
Aggregate Attributes [1]: [max(age#1L)#12L]
Results [2]: [department#2, max(age#1L)#12L AS max_age#13L]
(6) AdaptiveSparkPlan
Output [2]: [department#2, max_age#13L]
Arguments: isFinalPlan=false
Run Code Online (Sandbox Code Playgroud)
这很有用,但不是我需要的。我需要引用列的列表。这可能吗?
也许提出问题的另一种方式是......有没有办法获得解释计划作为我可以迭代/探索的对象?
更新。感谢 @matt-andruff 的回复,我得到了这个:
df._sc._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), "formatted")
Run Code Online (Sandbox Code Playgroud)
返回:
== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
+- Exchange (4)
+- HashAggregate (3)
+- Project (2)
+- Scan ExistingRDD (1)
(1) Scan ExistingRDD
Output [3]: [name#0, age#1L, department#2]
Arguments: [name#0, age#1L, department#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Project
Output [2]: [age#1L, department#2]
Input [3]: [name#0, age#1L, department#2]
(3) HashAggregate
Input [2]: [age#1L, department#2]
Keys [1]: [department#2]
Functions [1]: [partial_max(age#1L)]
Aggregate Attributes [1]: [max#22L]
Results [2]: [department#2, max#23L]
(4) Exchange
Input [2]: [department#2, max#23L]
Arguments: hashpartitioning(department#2, 200), ENSURE_REQUIREMENTS, [plan_id=60]
(5) HashAggregate
Input [2]: [department#2, max#23L]
Keys [1]: [department#2]
Functions [1]: [max(age#1L)]
Aggregate Attributes [1]: [max(age#1L)#12L]
Results [2]: [department#2, max(age#1L)#12L AS max_age#13L]
(6) AdaptiveSparkPlan
Output [2]: [department#2, max_age#13L]
Arguments: isFinalPlan=false
Run Code Online (Sandbox Code Playgroud)
我想我可以从中解析我想要的信息,但这远非优雅的方法,而且特别容易出错。
我真正追求的是一种故障安全、可靠、API 支持的方式来获取此信息。我开始认为这是不可能的。
小智 -1
您可以尝试以下代码,这将为您提供数据框中的列列表及其数据类型。
for field in df.schema.fields:
print(field.name +" , "+str(field.dataType))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
554 次 |
| 最近记录: |