Har*_*h J 5 apache-spark pyspark aws-glue delta-lake
我在 AWS Glue 环境中工作。我从 Glue 目录中读取数据作为动态数据帧,并将其转换为 Pyspark 数据帧以进行自定义转换。为了更新插入新的/更新的数据,我打算使用增量表。
但我只找到从路径读取数据作为增量表的选项。我需要将 Pyspark 数据帧转换为 Delta 表以进行合并操作。有什么办法可以做到这一点吗?
您只需要一个目标表作为增量表。您计划合并的数据不需要是增量表。这实际上取决于您使用的 API:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Run Code Online (Sandbox Code Playgroud)
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
Run Code Online (Sandbox Code Playgroud)
这里唯一的问题是您需要df._jdf.sparkSession().sql在注册临时视图的同一上下文中执行 SQL 命令。
| 归档时间: |
|
| 查看次数: |
13659 次 |
| 最近记录: |