SPARK SQL - 使用DataFrames和JDBC更新MySql表

nic*_*ola 19 jdbc apache-spark apache-spark-sql

我正在尝试使用Spark SQL DataFrames和JDBC连接在MySql上插入和更新一些数据.

我已成功使用SaveMode.Append插入新数据.有没有办法从Spark SQL更新MySql Table中已存在的数据?

我要插入的代码是:

myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)

如果我改为SaveMode.Overwrite它会删除整个表并创建一个新表,我正在寻找像MySql中可用的"ON DUPLICATE KEY UPDATE"之类的东西

zer*_*323 21

这不可能.至于现在(Spark 1.6.0/2.2.0 SNAPSHOT)Spark DataFrameWriter只支持四种写入模式:

  • SaveMode.Overwrite:覆盖现有数据.
  • SaveMode.Append:追加数据.
  • SaveMode.Ignore:忽略操作(即无操作).
  • SaveMode.ErrorIfExists:default选项,在运行时抛出异常.

您可以手动插入,例如使用mapPartitions(因为您希望UPSERT操作应该是幂等的,因此易于实现),写入临时表并手动执行upsert,或使用触发器.

通常,实现批处理操作的upsert行为并保持良好的性能远非微不足道.您必须记住,在一般情况下,将存在多个并发事务(每个分区一个),因此您必须确保不会发生写入冲突(通常使用特定于应用程序的分区)或提供适当的恢复过程.实际上,最好执行并批量写入临时表并直接在数据库中解析upsert部分.


Ayd*_* K. 6

遗憾的是SaveMode.Upsert,Spark 中没有用于诸如更新插入之类的常见情况的模式。

zero322 总体上是正确的,但我认为应该可以(在性能上有所妥协)提供这样的替换功能。

我还想为此案例提供一些 Java 代码。当然,它的性能不如 spark 内置的那么好 - 但它应该是满足您要求的良好基础。只需根据您的需要修改它:

myDF.repartition(20); //one connection per partition, see below

myDF.foreachPartition((Iterator<Row> t) -> {
            Connection conn = DriverManager.getConnection(
                    Constants.DB_JDBC_CONN,
                    Constants.DB_JDBC_USER,
                    Constants.DB_JDBC_PASS);

            conn.setAutoCommit(true);
            Statement statement = conn.createStatement();

            final int batchSize = 100000;
            int i = 0;
            while (t.hasNext()) {
                Row row = t.next();
                try {
                    // better than REPLACE INTO, less cycles
                    statement.addBatch(("INSERT INTO mytable " + "VALUES ("
                            + "'" + row.getAs("_id") + "', 
                            + "'" + row.getStruct(1).get(0) + "'
                            + "')  ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
                    //conn.commit();

                    if (++i % batchSize == 0) {
                        statement.executeBatch();
                    }
                } catch (SQLIntegrityConstraintViolationException e) {
                    //should not occur, nevertheless
                    //conn.commit();
                } catch (SQLException e) {
                    e.printStackTrace();
                } finally {
                    //conn.commit();
                    statement.executeBatch();
                }
            }
            int[] ret = statement.executeBatch();

            System.out.println("Ret val: " + Arrays.toString(ret));
            System.out.println("Update count: " + statement.getUpdateCount());
            conn.commit();

            statement.close();
            conn.close();
Run Code Online (Sandbox Code Playgroud)

  • 这对我来说非常有效。我必须做的一个小修正是在 `statement.close();` 之前注释掉 `conn.commit();` 行。否则,它会抛出此错误“java-sql-sqlexception-cant-call-commit-when-autocommit-true”。 (2认同)