使用AWS Glue覆盖MySQL表

Joe*_*oeC 6 mysql amazon-web-services pyspark aws-glue

我有一个lambda进程,偶尔会轮询API以获取最新数据.这个数据有唯一的密钥,我想用Glue来更新MySQL中的表.是否可以使用此密钥覆盖数据?(类似于Spark的模式=覆盖).如果没有 - 我可以在插入所有新数据之前截断Glue中的表吗?

谢谢

hoa*_*axz 5

我在使用 Redshift 时遇到了同样的问题,我们能想到的最佳解决方案是创建一个 Java 类来加载 MySQL 驱动程序并发出截断表:

package com.my.glue.utils.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

@SuppressWarnings("unused")
public class MySQLTruncateClient {
    public void truncate(String tableName, String url) throws SQLException, ClassNotFoundException {
        Class.forName("com.mysql.jdbc.Driver");
        try (Connection mysqlConnection = DriverManager.getConnection(url);
            Statement statement = mysqlConnection.createStatement()) {
            statement.execute(String.format("TRUNCATE TABLE %s", tableName));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

将该 JAR 连同您的 MySQL Jar 依赖项一起上传到 S3,并使您的工作依赖于它们。在 PySpark 脚本中,您可以使用以下命令加载 truncate 方法:

java_import(glue_context._jvm, "com.my.glue.utils.mysql.MySQLTruncateClient")
truncate_client = glue_context._jvm.MySQLTruncateClient()
truncate_client.truncate('my_table', 'jdbc:mysql://...')
Run Code Online (Sandbox Code Playgroud)

  • 感谢您的解决方法:)我觉得他们应该解决这个问题 (2认同)
  • 感谢您的解决方法,这帮助我们连接到 SQL Server,以便在从 Glue 加载到表之前和之后运行查询。 (2认同)

Roh*_*mar 5

我发现了一种在 Glue 中使用 JDBC 连接的更简单的方法。当您将数据写入 Redshift 集群时,Glue 团队建议通过以下示例代码截断表:

datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = resolvechoice4, catalog_connection = "<connection-name>", connection_options = {"dbtable": "<target-table>", "database": "testdb", "preactions":"TRUNCATE TABLE <table-name>"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
Run Code Online (Sandbox Code Playgroud)

在哪里

connection-name your Glue connection name to your Redshift Cluster
target-table    the table you're loading the data in 
testdb          name of the database 
table-name      name of the table to truncate (ideally the table you're loading into)
Run Code Online (Sandbox Code Playgroud)


Joe*_*oeC 4

我想出的解决方法比发布的替代方法稍微简单一些,如下所示:

  • 在 mysql 中创建一个临时表,并将新数据加载到该表中。
  • 运行命令:REPLACE INTO myTable SELECT * FROM myStagingTable;
  • 截断暂存表

这可以通过以下方式完成:

import sys from awsglue.transforms
import * from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE")
cursor = db.cursor()
cursor.execute("REPLACE INTO myTable SELECT * FROM myStagingTable")
cursor.fetchall()

db.close()
job.commit()
Run Code Online (Sandbox Code Playgroud)