sew*_*dth 3 apache-spark pyspark aws-glue aws-glue-spark
我有一个火花作业,它只会从具有相同转换的多个表中提取数据。基本上是一个遍历表列表的 for 循环,查询目录表,添加时间戳,然后推送到 Redshift(下面的示例)。
完成这项工作大约需要 30 分钟。有没有办法在相同的火花/胶水环境下并行运行这些?如果可以避免的话,我不想创建单独的胶水作业。
import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *
# query the runtime arguments
args = getResolvedOptions(
sys.argv,
["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)
# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()
tables = []
for table in tables:
catalog_table = glueContext.create_dynamic_frame.from_catalog(
database="test", table_name=table, transformation_ctx=table
)
data_set = catalog_table.toDF().withColumn(
"batchLoadTimestamp", lit(job_execution_timestamp)
)
# covert back to glue dynamic frame
export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")
# remove null rows from dynamic frame
non_null_records = DropNullFields.apply(
frame=export_frame, transformation_ctx="non_null_records"
)
temp_dir = os.path.join(args["TempDir"], redshift_table_name)
stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=non_null_records,
catalog_connection=args["redshift_catalog_connection"],
connection_options={
"dbtable": f"{args['target_schema']}.{redshift_table_name}",
"database": args["target_database"],
"preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
},
redshift_tmp_dir=temp_dir,
transformation_ctx="stores_redshiftSink",
) ```
Run Code Online (Sandbox Code Playgroud)
您可以执行以下操作来加快此过程
现在假设您有 100 个表要摄取,您可以将列表分成 10 个表,并同时运行该作业 10 次。
由于您的数据将并行加载,因此 Glue 作业运行的时间将减少,因此会产生更少的成本。
更快的替代方法是直接使用 redshift 实用程序。
为什么这种方法会更快?因为 spark redshift jdbc 连接器首先将 spark 数据帧卸载到 s3,然后准备复制命令到 redshift 表。在直接运行 copy 命令时,您将消除运行 unload 命令并将数据读入 spark df 的开销。