Man*_*ani 2 python mysql amazon-s3 amazon-web-services aws-glue
我对 AWS 比较陌生,所以我不知道如何去做,
我在 s3 上有 CSV 文件,并且已经在 RDS 上设置了 Aurora 实例。我无法弄清楚的是如何自动批量加载数据,本质上就像LOAD DATA FROM s3使用 AWS Glue 之类的东西一样。
我还使用了 s3 到 RDS 的 Glue 原生功能,但它本质上是通过 JDBC 连接向 RDS 进行一堆插入,这对于大型数据集来说也非常慢。
我可以在 RDS 上独立运行命令来完成此操作,但我不想这样做,而是想利用 Glue。我还考虑过使用适用于 Python 的 MySQL 连接器,但 Glue 本身仅支持 Python 2.7,这是我不想使用的东西。
任何帮助将不胜感激。
该方法如上所述,有一个 S3 事件触发器和一个侦听 s3 存储桶/对象位置的 lambda 作业。文件上传到 s3 位置后,lambda 作业就会运行,并且在 lambda 中,您可以配置为调用 AWS Glue 作业。这正是我们所做的,并且已经成功上线。Lambda 的寿命为 15 分钟,触发/启动 Glue 作业的时间应该不到一分钟。
请在此找到示例来源以供参考。
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
Run Code Online (Sandbox Code Playgroud)
要创建 Lambda 函数,请转至 AWS Lambdra->从头开始创建新函数->为事件选择 S3,然后根据需要配置 S3 存储桶位置和前缀。然后复制粘贴上面的代码示例,内联代码区域,并根据需要配置胶水作业名称。请确保您拥有所有必需的 IAM 角色/访问设置。
胶水作业应该能够连接到您的 Aurora,然后您可以使用 Aurora 提供的“LOAD FROM S3.....”命令。确保所有参数组设置/配置均根据需要完成。
如有任何问题请告诉我。
更新:从 S3 加载的示例代码片段:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, col4=@var4;"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6594 次 |
| 最近记录: |