如何使用 AWS Glue 运行任意/DDL SQL 语句或存储过程

mis*_*kin 11 py4j pyspark aws-glue

是否可以从 AWS Glue python 作业执行任意 SQL 命令,例如 ALTER TABLE?我知道我可以用它从表中读取数据,但是有没有办法执行其他数据库特定的命令?

我需要将数据提取到目标数据库中,然后立即运行一些 ALTER 命令。

mis*_*kin 14

因此,在进行了广泛的研究并在 AWS 支持下打开了一个案例后,他们告诉我目前无法通过 Python shell 或 Glue pyspark 作业进行操作。但我只是尝试了一些有创意的东西,它奏效了!这个想法是使用 Sparks 已经依赖的 py4j 并利用标准的 java sql 包。

这种方法的两大好处:

  1. 这样做的一个巨大好处是,您可以将数据库连接定义为 Glue 数据连接,并将 jdbc 详细信息和凭据保留在其中,而无需将它们硬编码在 Glue 代码中。我下面的示例通过调用glueContext.extract_jdbc_conf('your_glue_data_connection_name')获取 Glue 中定义的 jdbc url 和凭据来实现这一点。

  2. 如果您需要在受支持的现成 Glue 数据库上运行 SQL 命令,您甚至不需要为该数据库使用/传递 jdbc 驱动程序 - 只需确保为该数据库设置 Glue 连接并将该连接添加到您的数据库Glue 作业 - Glue 将上传正确的数据库驱动程序 jar。

请记住,下面的代码是由驱动程序进程执行的,不能由 Spark 工作线程/执行程序执行。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

logger = glueContext.get_logger()

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();

conn.close()
Run Code Online (Sandbox Code Playgroud)


小智 7

几个小时后我终于开始工作了,希望以下内容对您有所帮助。我的剧本深受早期回复的影响,谢谢。

先决条件:

  • 在尝试任何脚本之前,您需要配置并测试 Glue 连接。
  • 设置 AWS Glue 作业时,请使用 Spark、Glue 版本 2.0 或更高版本以及 Python 版本 3。
  • 我建议将此作业配置为仅 2 个工作线程以节省成本;大部分工作将由数据库完成,而不是通过胶水完成。
  • 以下内容是使用 AWS RDS PostgreSQL 实例进行测试的,但希望足够灵活,可以适用于其他数据库。
  • 该脚本需要在脚本顶部附近更新 3 个参数(glue_connection_name、database_name 和stored_proc)。
  • JOB_NAME、连接字符串和凭据由脚本检索,不需要提供。
  • 如果您的存储过程将返回数据集,则将executeUpdate 替换为executeQuery。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
    
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
    
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
    
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
    
logger = glueContext.get_logger()
    
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
    
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
    
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
    
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
    
logger.info("Finished")
Run Code Online (Sandbox Code Playgroud)