feb*_*har 37 scala psql apache-spark
我想知道如何在scala中执行以下操作?
我知道使用scala但是如何在打包时将psql scala的连接器jar导入到sbt中?
Dan*_*bos 43
我们的目标是从Spark工作者运行并行SQL查询.
将连接器和JDBC添加到libraryDependencies中build.sbt.我只用MySQL试过这个,所以我会在我的例子中使用它,但Postgres应该是一样的.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
Run Code Online (Sandbox Code Playgroud)
在创建时,SparkContext告诉它要将哪些罐子复制到执行器.包括连接器jar.一个好看的方式来做到这一点:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Run Code Online (Sandbox Code Playgroud)
现在Spark已准备好连接到数据库.每个执行程序都将运行部分查询,以便结果可以进行分布式计算.
有两种选择.较旧的方法是使用org.apache.spark.rdd.JdbcRDD:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Run Code Online (Sandbox Code Playgroud)
查看参数文档.简述:
SparkContext.SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100在示例中执行.ResultSet为某些东西.在示例中,我们将其转换为a String,因此您最终会得到一个RDD[String].自Apache Spark版本1.3.0起,另一种方法可通过DataFrame API获得.而不是JdbcRDD你会创建一个org.apache.spark.sql.DataFrame:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
Run Code Online (Sandbox Code Playgroud)
有关选项的完整列表,请参阅https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases(可以设置密钥范围和分区数量)喜欢JdbcRDD).
JdbcRDD不支持更新.但你可以简单地做到foreachPartition.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
Run Code Online (Sandbox Code Playgroud)
(这会为每个分区创建一个连接.如果这是一个问题,请使用连接池!)
DataFrame通过createJDBCTable和insertIntoJDBC方法支持更新.
| 归档时间: |
|
| 查看次数: |
36749 次 |
| 最近记录: |