nic*_*ola 19 jdbc apache-spark apache-spark-sql
我正在尝试使用Spark SQL DataFrames和JDBC连接在MySql上插入和更新一些数据.
我已成功使用SaveMode.Append插入新数据.有没有办法从Spark SQL更新MySql Table中已存在的数据?
我要插入的代码是:
myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)
如果我改为SaveMode.Overwrite它会删除整个表并创建一个新表,我正在寻找像MySql中可用的"ON DUPLICATE KEY UPDATE"之类的东西
zer*_*323 21
这不可能.至于现在(Spark 1.6.0/2.2.0 SNAPSHOT)Spark DataFrameWriter
只支持四种写入模式:
SaveMode.Overwrite
:覆盖现有数据.SaveMode.Append
:追加数据.SaveMode.Ignore
:忽略操作(即无操作).SaveMode.ErrorIfExists
:default选项,在运行时抛出异常.
您可以手动插入,例如使用mapPartitions
(因为您希望UPSERT操作应该是幂等的,因此易于实现),写入临时表并手动执行upsert,或使用触发器.
通常,实现批处理操作的upsert行为并保持良好的性能远非微不足道.您必须记住,在一般情况下,将存在多个并发事务(每个分区一个),因此您必须确保不会发生写入冲突(通常使用特定于应用程序的分区)或提供适当的恢复过程.实际上,最好执行并批量写入临时表并直接在数据库中解析upsert部分.
遗憾的是SaveMode.Upsert
,Spark 中没有用于诸如更新插入之类的常见情况的模式。
zero322 总体上是正确的,但我认为应该可以(在性能上有所妥协)提供这样的替换功能。
我还想为此案例提供一些 Java 代码。当然,它的性能不如 spark 内置的那么好 - 但它应该是满足您要求的良好基础。只需根据您的需要修改它:
myDF.repartition(20); //one connection per partition, see below
myDF.foreachPartition((Iterator<Row> t) -> {
Connection conn = DriverManager.getConnection(
Constants.DB_JDBC_CONN,
Constants.DB_JDBC_USER,
Constants.DB_JDBC_PASS);
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
// better than REPLACE INTO, less cycles
statement.addBatch(("INSERT INTO mytable " + "VALUES ("
+ "'" + row.getAs("_id") + "',
+ "'" + row.getStruct(1).get(0) + "'
+ "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
//conn.commit();
if (++i % batchSize == 0) {
statement.executeBatch();
}
} catch (SQLIntegrityConstraintViolationException e) {
//should not occur, nevertheless
//conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
//conn.commit();
statement.executeBatch();
}
}
int[] ret = statement.executeBatch();
System.out.println("Ret val: " + Arrays.toString(ret));
System.out.println("Update count: " + statement.getUpdateCount());
conn.commit();
statement.close();
conn.close();
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
14358 次 |
最近记录: |