使用 Scala 更新 Spark 数据库中的数据

Ati*_*zad 5 scala apache-spark

我正在开发一个使用 Spark 和 scala 的项目,我对两者都是新手,但是在 stackoverflow 的帮助下,我已经完成了所有数据处理并将处理后的数据存储在 mysql 中。现在我终于遇到了一个问题,我不知道如何解决它。当我第一次处理数据时,我使用这种方法存储数据帧,并且第一个时间表是空的。

      df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 
Run Code Online (Sandbox Code Playgroud)

假设我处理的数据在数据库中看起来像这样。

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     24                   2016-04-27 00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00
Run Code Online (Sandbox Code Playgroud)

现在,名为“Root”且 eid 为“C111”的人于“2016-08-30 00:00:00”访问办公室 2 次,在处理此新数据后,我只需要更新数据库中的此人记录。我将如何做到这一点。现在更新后的表应该如下所示。

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     26                   2016-08-30  00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00
Run Code Online (Sandbox Code Playgroud)

我在这个表中有数百万条数据,如果我在 Spark DataFrame 中加载完整的表并更新所需的记录,那么它将花费更多的时间,而且它没有意义,因为为什么当我只想更新一个时我加载完整的表row.I 尝试了此代码,但它将新行添加到表中而不是更新行。

       df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Run Code Online (Sandbox Code Playgroud)

有什么办法可以在火花中做到这一点吗?

我在互联网上看到过这个,我可以这样做更新吗?

val numParallelInserts = 10
val batchSize = 1000

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
  val db = connect()

  val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
  val stmt = db.prepareStatement(sql)

  iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
    batch foreach { session =>
      stmt.setString(1, session.id)
      stmt.setString(2, TimestampFormat.print(session.ts))
      stmt.addBatch()
    }
    stmt.executeBatch()
    db.commit();
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
  }

  db.close();
Run Code Online (Sandbox Code Playgroud)

Kak*_*aji 3

您可以尝试使用 sql 来做到这一点。将更新的(甚至是新的)数据存储在新的临时表中,然后将临时表合并到主表中。

一种方法是 -

  1. 使用临时表更新主表中的所有记录

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. 从临时表中删除所有重复记录(仅在临时表中保留新记录)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. 将临时表中的所有记录插入主表

    insert into main_table select * from temp_table;

  4. 删除临时表

    drop table temp_table;