voi*_*oid 14 postgresql scala dataframe apache-spark apache-spark-sql
我正在使用Apache Spark DataFrames连接两个数据源并将结果作为另一个DataFrame获取.我想将结果写入另一个Postgres表.我看到这个选项:
myDataFrame.write.jdbc(url, table, connectionProperties)
Run Code Online (Sandbox Code Playgroud)
但是,我想要做的是根据表的主键将数据帧放入表中.怎么做?我正在使用Spark 1.6.0.
zer*_*323 17
它不受支持.DataFrameWriter可以附加或覆盖现有表.如果您的应用程序需要更复杂的逻辑,则必须手动处理.
一种选择是使用带有标准JDBC连接的操作(foreach,foreachPartition).另一个是写入临时文件并直接在数据库中处理其余部分.
jst*_*ill 13
KrisP有权利.进行upsert的最佳方法不是通过准备好的声明.重要的是要注意,此方法将一次插入一个具有与您拥有的工作者数量一样多的分区的方法.如果你想批量做这件事你也可以
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
Run Code Online (Sandbox Code Playgroud)
这将为每个worker执行批处理并关闭DB连接.它可以让您控制工人数量,批次数量,并允许您在这些范围内工作.
如果您打算手动完成并通过zero323提到的选项1,那么您应该在这里查看插入语句的 Spark 源代码
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
Run Code Online (Sandbox Code Playgroud)
该PreparedStatement是的一部分java.sql,它有类似的方法execute()和executeUpdate().sql当然,您仍然需要相应地修改.