从 AWS Glue 升级到 Amazon Redshift

Arp*_*ngh 2 amazon-web-services amazon-redshift aws-glue

我知道没有直接的 UPSERT 查询可以直接从 Glue 到 Redshift 执行。是否可以在胶水脚本本身中实现临时表概念?

所以我的期望是创建临时表,将它与目标表合并,最后删除它。可以在 Glue 脚本中实现吗?

Yur*_*ruk 6

通过将 'postactions' 选项传递给 JDBC 接收器,可以使用 Glue 中的暂存表将更新插入到 Redshift 中:

val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"

val fields = datasetDf.toDF().columns.mkString(",")

val postActions =
  s"""
     DELETE FROM $destination USING $staging AS S
        WHERE $destinationTable.id = S.id
          AND $destinationTable.date = S.date;
     INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
     DROP TABLE IF EXISTS $staging
  """

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "postactions" -> postActions
  )),
  redshiftTmpDir = s"$tempDir/redshift",
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
Run Code Online (Sandbox Code Playgroud)

确保用于写入 Redshift 的用户有足够的权限在暂存模式中创建/删除表。


Vzz*_*arr 5

显然函数connection_options中的字典参数glueContext.write_dynamic_frame.from_jdbc_conf有两个有趣的参数:preactionspostactions

target_table = "my_schema.my_table"
stage_table = "my_schema.#my_table_stage_table"


pre_query = """
    drop table if exists {stage_table};
    create table {stage_table} as select * from {target_table} LIMIT 0;""".format(stage_table=stage_table, target_table=target_table)

post_query = """
    begin;
    delete from {target_table} using {stage_table} where {stage_table}.id = {target_table}.id ; 
    insert into {target_table} select * from {stage_table}; 
    drop table {stage_table}; 
    end;""".format(stage_table=stage_table, target_table=target_table)
    
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = datasource0, catalog_connection ="test_red", redshift_tmp_dir='s3://s3path', transformation_ctx="datasink4",
    connection_options = {"preactions": pre_query, "postactions": post_query, 
                          "dbtable": stage_table, "database": "redshiftdb"})
Run Code Online (Sandbox Code Playgroud)

基于https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/