在Apache Spark中连接到SQLite

flo*_*wit 3 sqlite scala apache-spark apache-spark-sql

我想在SQLite数据库中的所有表上运行自定义函数.该函数或多或少相同,但取决于各个表的模式.此外,表及其模式仅在运行时已知(使用指定数据库路径的参数调用程序).

这是我到目前为止:

val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// somehow bind sqlContext to DB

val allTables = sqlContext.tableNames

for( t <- allTables) {
    val df = sqlContext.table(t)
    val schema = df.columns
    sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
Run Code Online (Sandbox Code Playgroud)

到目前为止我发现的唯一提示需要提前知道该表,在我的场景中并非如此:

val tableData = 
  sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
    .load()
Run Code Online (Sandbox Code Playgroud)

我正在使用xerial sqlite jdbc驱动程序.那么我怎样才能仅仅与数据库联系,而不是对表?

编辑:使用Beryllium的答案作为开始我将我的代码更新为:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val metaData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()

val myTableNames = metaData.select("tbl_name").distinct()

for (t <- myTableNames) {
    println(t.toString)

    val tableData = sqlContext.table(t.toString)

    for (record <- tableData.select("*")) {
        println(record)
    }
}
Run Code Online (Sandbox Code Playgroud)

至少我可以在运行时读取表名,这对我来说是一个巨大的进步.但我无法阅读表格.我试过了两个

val tableData = sqlContext.table(t.toString)
Run Code Online (Sandbox Code Playgroud)

val tableData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> t.toString)).load()
Run Code Online (Sandbox Code Playgroud)

在循环中,但在这两种情况下,我得到一个NullPointerException.虽然我可以打印表名,但似乎我无法连接到它们.

最后但并非最不重要的是我总是收到SQLITE_ERROR: Connection is closed错误.它看起来与此问题中描述的问题相同:SQLITE_ERROR:从Spark通过JDBC连接到SQLite数据库时连接已关闭

Ber*_*ium 5

您可以尝试两种选择

直接使用JDBC

  • 在Spark作业中打开一个单独的普通JDBC连接
  • 从JDBC元数据中获取表名
  • 把这些都融入你的for理解中

对"dbtable"参数使用SQL查询

您可以将查询指定为dbtable参数的值.从语法上讲,这个查询必须"看起来"像一个表,所以它必须包装在一个子查询中.

在该查询中,从数据库中获取元数据:

val df = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:xxx",
    "user" -> "x",
    "password" -> "x",
    "dbtable" -> "(select * from pg_tables) as t")).load()
Run Code Online (Sandbox Code Playgroud)

这个例子适用于PostgreSQL,你必须适应它的SQLite.

更新

似乎JDBC驱动程序仅支持迭代一个结果集.无论如何,当您使用时实现表名列表时collect(),以下代码段应该起作用:

val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()

for (t <- myTableNames) {
  println(t.toString)

  val tableData = sqlContext.read.format("jdbc")
    .options(
      Map(
        "url" -> "jdbc:sqlite:/x.db",
        "dbtable" -> t)).load()

  tableData.show()
}
Run Code Online (Sandbox Code Playgroud)