如何使用apache Spark在mysql数据库中创建表

svk*_*994 2 scala apache-spark apache-spark-sql

我正在尝试创建一个 Spark 应用程序,它对于创建、读取、写入和更新 MySQL 数据很有用。那么,有没有办法使用Spark创建MySQL表呢?

下面我有一个 Scala-JDBC 代码,它在 MySQL 数据库中创建一个表。我怎样才能通过 Spark 做到这一点?

package SparkMysqlJdbcConnectivity

import org.apache.spark.sql.SparkSession
import java.util.Properties
import java.lang.Class
import java.sql.Connection
import java.sql.DriverManager

object MysqlSparkJdbcProgram {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("MysqlJDBC Connections")
      .master("local[*]")
      .getOrCreate()

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/world"
    val operationtype = "create table"
    val tablename = "country"
    val tablename2 = "state"

    val connectionProperties = new Properties()

    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")

    val jdbcDf = spark.read.jdbc(url, s"${tablename}", connectionProperties)

    operationtype.trim() match {
      case "create table" => {
       // Class.forName(driver)
        try{
          val con:Connection = DriverManager.getConnection(url,connectionProperties)
          val result = con.prepareStatement(s"create table ${tablename2} (name varchar(255), country varchar(255))").execute()
          println(result)
          if(result) println("table creation is unsucessful") else println("table creation is unsucessful")
        }
      }

      case "read table" => {

        val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)
        jdbcDf.show()
      }

      case "write table" => {}

      case "drop table"  => {}

    }

  }

}
Run Code Online (Sandbox Code Playgroud)

Rav*_*mar 6

当您编写 jdbcDf 数据帧时,将自动创建这些表。

jdbcDf
 .write
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)
Run Code Online (Sandbox Code Playgroud)

如果您想指定表架构,

jdbcDf
 .write
 .option("createTableColumnTypes", "name VARCHAR(500), col1 VARCHAR(1024), col3 int")
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)
Run Code Online (Sandbox Code Playgroud)