ben*_*eni 3 amazon-redshift aws-glue
是否可以使用AWS Glue作业在Redshift中加载多个表?
这些是我遵循的步骤。
我不熟悉python,并且对AWS Glue不熟悉。但我有几张表需要上传。
这是一个示例脚本:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sampledb", table_name = "abs", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
Run Code Online (Sandbox Code Playgroud)
aws胶水数据库:aws胶水中的sampledb
表名称:abs
redshift数据库:dbmla
请找到有关如何上传它们的示例。谢谢!
根据AWS Glue常见问题解答,您可以修改生成的代码并运行作业。
问:如何自定义由AWS Glue生成的ETL代码?
AWS Glue的ETL脚本推荐系统生成Scala或Python代码。它利用Glue的自定义ETL库来简化对数据源的访问以及管理作业执行。您可以在我们的文档中找到有关该库的更多详细信息。您可以使用AWS Glue的自定义库编写ETL代码,也可以通过AWS Glue Console脚本编辑器使用内联编辑,下载自动生成的代码并在自己的IDE中对其进行编辑,从而在Scala或Python中编写任意代码。您也可以从Github存储库中托管的众多示例之一开始,并自定义该代码。
因此,请尝试将其他表格的代码段添加到同一脚本中,如下所示,
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
datasource3 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
Run Code Online (Sandbox Code Playgroud)
相应地更改变量名称,以使其唯一。谢谢
| 归档时间: |
|
| 查看次数: |
2092 次 |
| 最近记录: |