Aro*_*mal 6 mysql amazon-web-services amazon-aurora aws-glue aws-glue-data-catalog
我使用如下所示的视觉选项卡创建了一个粘合作业。首先,我连接到一个 mysql 表作为数据源,该表已经在我的数据目录中。然后在转换节点中,我编写了一个自定义 SQL 查询以仅从源表中选择一列。通过数据预览功能进行验证,转换节点工作正常。现在我想将数据写入只有一列“字符串”数据类型的现有数据库表。粘合作业成功,但我没有看到表中的数据。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node MySQL
MySQL_node1650299412376 = glueContext.create_dynamic_frame.from_catalog(
database="glue_rds_test",
table_name="test_customer",
transformation_ctx="MySQL_node1650299412376",
)
# Script generated for node SQL
SqlQuery0 = """
select CUST_CODE from customer
"""
SQL_node1650302847690 = sparkSqlQuery(
glueContext,
query=SqlQuery0,
mapping={"customer": MySQL_node1650299412376},
transformation_ctx="SQL_node1650302847690",
)
# Script generated for node MySQL
MySQL_node1650304163076 = glueContext.write_dynamic_frame.from_catalog(
frame=SQL_node1650302847690,
database="glue_rds_test",
table_name="test_customer2",
transformation_ctx="MySQL_node1650304163076",
)
job.commit()
Run Code Online (Sandbox Code Playgroud)
对我来说,问题是 SQL 查询中所选字段的双引号。放弃使用双引号解决了这个问题。Spark SQL语法文档中没有提及它
例如,我“错误地”使用了以下查询语法:
select "CUST_CODE" from customer
Run Code Online (Sandbox Code Playgroud)
而不是这个“正确”的:
select CUST_CODE from customer
Run Code Online (Sandbox Code Playgroud)
您共享的示例代码似乎没有这个语法问题,但我认为将答案放在这里可能对其他人有帮助。
| 归档时间: |
|
| 查看次数: |
1418 次 |
| 最近记录: |