Ati*_*zad 5 scala apache-spark
我正在开发一个使用 Spark 和 scala 的项目,我对两者都是新手,但是在 stackoverflow 的帮助下,我已经完成了所有数据处理并将处理后的数据存储在 mysql 中。现在我终于遇到了一个问题,我不知道如何解决它。当我第一次处理数据时,我使用这种方法存储数据帧,并且第一个时间表是空的。
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Run Code Online (Sandbox Code Playgroud)
假设我处理的数据在数据库中看起来像这样。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 24 2016-04-27 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
Run Code Online (Sandbox Code Playgroud)
现在,名为“Root”且 eid 为“C111”的人于“2016-08-30 00:00:00”访问办公室 2 次,在处理此新数据后,我只需要更新数据库中的此人记录。我将如何做到这一点。现在更新后的表应该如下所示。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 26 2016-08-30 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
Run Code Online (Sandbox Code Playgroud)
我在这个表中有数百万条数据,如果我在 Spark DataFrame 中加载完整的表并更新所需的记录,那么它将花费更多的时间,而且它没有意义,因为为什么当我只想更新一个时我加载完整的表row.I 尝试了此代码,但它将新行添加到表中而不是更新行。
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Run Code Online (Sandbox Code Playgroud)
有什么办法可以在火花中做到这一点吗?
我在互联网上看到过这个,我可以这样做更新吗?
val numParallelInserts = 10
val batchSize = 1000
new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
val db = connect()
val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
val stmt = db.prepareStatement(sql)
iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
batch foreach { session =>
stmt.setString(1, session.id)
stmt.setString(2, TimestampFormat.print(session.ts))
stmt.addBatch()
}
stmt.executeBatch()
db.commit();
logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
}
db.close();
Run Code Online (Sandbox Code Playgroud)
您可以尝试使用 sql 来做到这一点。将更新的(甚至是新的)数据存储在新的临时表中,然后将临时表合并到主表中。
一种方法是 -
使用临时表更新主表中的所有记录
update main_table
set visits = main_table.visits + temp_table.visits
from temp_table
where main_table.eid = temp_table.eid;
从临时表中删除所有重复记录(仅在临时表中保留新记录)
delete from temp_table where main_table.eid = temp_table.eid;
将临时表中的所有记录插入主表
insert into main_table select * from temp_table;
删除临时表
drop table temp_table;
| 归档时间: |
|
| 查看次数: |
7671 次 |
| 最近记录: |