Spark Dataframes UPSERT到Postgres表

voi*_*oid 14 postgresql scala dataframe apache-spark apache-spark-sql

我正在使用Apache Spark DataFrames连接两个数据源并将结果作为另一个DataFrame获取.我想将结果写入另一个Postgres表.我看到这个选项:

myDataFrame.write.jdbc(url, table, connectionProperties)
Run Code Online (Sandbox Code Playgroud)

但是,我想要做的是根据表的主键将数据帧放入表中.怎么做?我正在使用Spark 1.6.0.

zer*_*323 17

它不受支持.DataFrameWriter可以附加或覆盖现有表.如果您的应用程序需要更复杂的逻辑,则必须手动处理.

一种选择是使用带有标准JDBC连接的操作(foreach,foreachPartition).另一个是写入临时文件并直接在数据库中处理其余部分.


jst*_*ill 13

KrisP有权利.进行upsert的最佳方法不是通过准备好的声明.重要的是要注意,此方法将一次插入一个具有与您拥有的工作者数量一样多的分区的方法.如果你想批量做这件事你也可以

import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
  val dbc: Connection = DriverManager.getConnection("JDBCURL")
  val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

  batch.grouped("# Of Rows you want per batch").foreach { session =>
    session.foreach { x =>
      st.setDouble(1, x.getDouble(1)) 
      st.addBatch()
    }
    st.executeBatch()
  }
  dbc.close()
}
Run Code Online (Sandbox Code Playgroud)

这将为每个worker执行批处理并关闭DB连接.它可以让您控制工人数量,批次数量,并允许您在这些范围内工作.

  • “mapPartitions(d => Iterator(d))”的用途是什么?这与“foreachPartition”相同吗? (2认同)
  • @jstuartmill我尝试了你的方法。我收到“Iterator”错误,这是错误“无法找到 Iterator[org.apache.spark.sql.Row] 类型的编码器”。需要一个隐式 Encoder[Iterator[org.apache.spark.sql.Row]] 来将 Iterator[org.apache.spark.sql.Row] 实例存储在数据集中。通过导入spark.implicits来支持基本类型(Int、String等)和产品类型(case类)。_在未来的版本中将添加对序列化其他类型的支持。`spark版本是2.4.4 (2认同)

Kri*_*isP 8

如果您打算手动完成并通过zero323提到的选项1,那么您应该在这里查看插入语句的 Spark 源代码

  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
    val columns = rddSchema.fields.map(_.name).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
    conn.prepareStatement(sql)
  }
Run Code Online (Sandbox Code Playgroud)

PreparedStatement的一部分java.sql,它有类似的方法execute()executeUpdate().sql当然,您仍然需要相应地修改.