Mat*_*son 1 apache-spark apache-spark-sql pyspark aws-glue
我继承了一些在 AWS Glue 上运行速度极其缓慢的代码。
在作业中,它创建了许多动态框架,然后使用spark.sql. 从 MySQL 和 Postgres 数据库读取表,然后使用 Glue 将它们连接在一起,最终将另一个表写回 Postgres。
示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴实际代码)
jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")
# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")
# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")
# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url = conf_g["url"] + "/reporting"
new_transactions_df.write.option("truncate", "true").jdbc(url, "staging.transactions", properties=conf_g, mode="overwrite")
Run Code Online (Sandbox Code Playgroud)
[SQL CODE HERE]实际上是一个简单的 select 语句,将三个表连接在一起以生成输出,然后将其写入 staging.transactions 表。
当我上次运行这个时,它只写入了 150 行,但花了 9 分钟才完成。有人可以指出我如何优化这个的方向吗?
附加信息:
通常,当使用 JDBC 驱动程序在 Spark 中读取/写入数据时,常见问题是操作未并行化。以下是您可能想要尝试的一些优化:
从您提供的代码来看,似乎所有表数据都是使用一个查询和一个 Spark 执行器读取的。
如果您直接使用 Spark DataFrame Reader,则可以设置选项partitionColumn, lowerBound, upperBound,fetchSize来使用多个工作线程并行读取多个分区,如文档中所述。例子:
spark.read.format("jdbc") \
#...
.option("partitionColumn", "partition_key") \
.option("lowerBound", "<lb>") \
.option("upperBound", "<ub>") \
.option("numPartitions", "<np>") \
.option("fetchsize", "<fs>")
Run Code Online (Sandbox Code Playgroud)
使用读分区时,请注意 Spark 将并行发出多个查询,因此请确保数据库引擎支持它并优化索引,特别是为了partition_column避免整个表扫描。
在 AWS Glue 中,可以通过使用参数传递其他选项来完成此操作additional_options:
要使用执行并行读取的 JDBC 连接,您可以设置
hashfield、hashexpression或hashpartitions选项:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
additional_options = {"hashfield": "transID", "hashpartitions": "10"}
).toDF().createOrReplaceTempView("trans")
Run Code Online (Sandbox Code Playgroud)
Glue 文档对此进行了描述:Reading from JDBC Tables in Parallel
batchsize选项:在您的特定情况下,不确定这是否有帮助,因为您只写入 150 行,但您可以指定此选项来提高写入性能:
new_transactions_df.write.format('jdbc') \
# ...
.option("batchsize", "10000") \
.save()
Run Code Online (Sandbox Code Playgroud)
您还可以通过将某些查询(过滤器、列选择)直接推送到数据库引擎来优化读取,而不是将整个表加载到动态框架中然后进行过滤。
在 Glue 中,可以使用参数来完成此操作push_down_predicate:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
push_down_predicate = "(transDate > '2021-01-01' and transStatus='OK')"
).toDF().createOrReplaceTempView("trans")
Run Code Online (Sandbox Code Playgroud)
在某些情况下,您可以考虑使用数据库引擎将表导出到文件中,然后从文件中读取。同样意味着写入时,首先写入文件,然后使用 db 批量插入命令。这可以避免使用 Spark 和 JDBC 的瓶颈。
| 归档时间: |
|
| 查看次数: |
7303 次 |
| 最近记录: |