Abh*_*jit 7 python sql scala apache-spark pyspark
要从SQL查询获取表名,
select *
from table1 as t1
full outer join table2 as t2
on t1.id = t2.id
Run Code Online (Sandbox Code Playgroud)
我在Scala中找到了解决方案如何从SQL查询获取表名?
def getTables(query: String): Seq[String] = {
val logicalPlan = spark.sessionState.sqlParser.parsePlan(query)
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
logicalPlan.collect { case r: UnresolvedRelation => r.tableName }
}
Run Code Online (Sandbox Code Playgroud)
当我遍历返回序列时,这为我提供了正确的表名 getTables(query).foreach(println)
table1
table2
Run Code Online (Sandbox Code Playgroud)
PySpark的等效语法是什么?我遇到的最接近的是 如何从pyspark中的SQL中提取列名和列类型
table1
table2
Run Code Online (Sandbox Code Playgroud)
追溯失败
Py4JError: An error occurred while calling o78.tableDesc. Trace:
py4j.Py4JException: Method tableDesc([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:835)
Run Code Online (Sandbox Code Playgroud)
我了解,问题源于以下事实:我需要过滤所有类型的计划项目,
UnresolvedRelation但无法在python / pyspark中找到等效的符号
我有一个方法,但相当复杂。它转储 Java 对象和 JSON(穷人的序列化过程),将其反序列化为 python 对象,过滤并解析表名称
import json
def get_tables(query: str):
plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
plan_items = json.loads(plan.toJSON())
for plan_item in plan_items:
if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
yield plan_item['tableIdentifier']['table']
Run Code Online (Sandbox Code Playgroud)
['fast_track_gv_nexus', 'buybox_gv_nexus']当我迭代该函数时会产生list(get_tables(query))
注意不幸的是,这会破坏CTE
例子
with delta as (
select *
group by id
cluster by id
)
select *
from ( select *
FROM
(select *
from dmm
inner join delta on dmm.id = delta.id
)
)
Run Code Online (Sandbox Code Playgroud)
为了解决这个问题,我必须通过正则表达式进行修改
import json
import re
def get_tables(query: str):
plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
plan_items = json.loads(plan.toJSON())
plan_string = plan.toString()
cte = re.findall(r"CTE \[(.*?)\]", plan_string)
for plan_item in plan_items:
if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
tableIdentifier = plan_item['tableIdentifier']
table = plan_item['tableIdentifier']['table']
database = tableIdentifier.get('database', '')
table_name = "{}.{}".format(database, table) if database else table
if table_name not in cte:
yield table_name
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
224 次 |
| 最近记录: |