自动将数据从 s3 批量加载到 Aurora MySQL RDS 实例

Man*_*ani 2 python mysql amazon-s3 amazon-web-services aws-glue

我对 AWS 比较陌生,所以我不知道如何去做,

我在 s3 上有 CSV 文件,并且已经在 RDS 上设置了 Aurora 实例。我无法弄清楚的是如何自动批量加载数据,本质上就像LOAD DATA FROM s3使用 AWS Glue 之类的东西一样。

我还使用了 s3 到 RDS 的 Glue 原生功能,但它本质上是通过 JDBC 连接向 RDS 进行一堆插入,这对于大型数据集来说也非常慢。

我可以在 RDS 上独立运行命令来完成此操作,但我不想这样做,而是想利用 Glue。我还考虑过使用适用于 Python 的 MySQL 连接器,但 Glue 本身仅支持 Python 2.7,这是我不想使用的东西。

任何帮助将不胜感激。

Yuv*_*uva 5

该方法如上所述,有一个 S3 事件触发器和一个侦听 s3 存储桶/对象位置的 lambda 作业。文件上传到 s3 位置后,lambda 作业就会运行,并且在 lambda 中,您可以配置为调用 AWS Glue 作业。这正是我们所做的,并且已经成功上线。Lambda 的寿命为 15 分钟,触发/启动 Glue 作业的时间应该不到一分钟。

请在此找到示例来源以供参考。

from __future__ import print_function
import json
import boto3
import time
import urllib

print('Loading function')

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="your-glue-job-name here"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e
Run Code Online (Sandbox Code Playgroud)

要创建 Lambda 函数,请转至 AWS Lambdra->从头开始创建新函数->为事件选择 S3,然后根据需要配置 S3 存储桶位置和前缀。然后复制粘贴上面的代码示例,内联代码区域,并根据需要配置胶水作业名称。请确保您拥有所有必需的 IAM 角色/访问设置。

胶水作业应该能够连接到您的 Aurora,然后您可以使用 Aurora 提供的“LOAD FROM S3.....”命令。确保所有参数组设置/配置均根据需要完成。

如有任何问题请告诉我。

更新:从 S3 加载的示例代码片段:

conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, col4=@var4;"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
Run Code Online (Sandbox Code Playgroud)