如何在spark-jdbc连接中操作numPartitions,lowerBound,upperBound?

Met*_*ata 1 apache-spark

我正在尝试使用spark-jdbc在postgres db上读取表。为此,我想出了以下代码:

object PartitionRetrieval {
  var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/myuser/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "base.ledgers"
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  def main(args: Array[String]): Unit = {
    val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
    val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
    println("gpTable Count: " + rc)
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,我正在获取行数,只是为了查看连接是成功还是失败。它是一个巨大的表,并且获取我所了解的计数的速度较慢,因为没有为应该进行数据分区的分区号和列名提供任何参数。

在很多地方,我看到jdbc对象是通过以下方式创建的:

val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties) 
Run Code Online (Sandbox Code Playgroud)

并且我使用创建了另一种格式的文件options。当使用'options'形成jdbc连接时,我无法理解如何给numPartitions分区列名称,我希望在该分区列名称上对数据进行分区:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

谁能让我知道

  1. 如何将参数:添加numPartitions, lowerBound, upperBound 到以这种方式编写的jdbc对象中:

    val gpTable = spark.read.format(“ jdbc”)。option(“ url”,connectionUrl).option(“ dbtable”,tableName).option(“ user”,devUserName).option(“ password”,devPassword)。加载()

  2. 如何只添加columnnamenumPartition因为我想获取所有从今年行:2017年,我不希望被挑行的范围(下界,上界)

小智 5

这些选项numPartitions, lowerBound, upperBound and PartitionColumn控制并行读入火花。您需要PartitionColumn的整数列。如果表中没有合适的列,则可以将其ROW_NUMBER用作分区列。

试试看

val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
                                       .option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
                                       .option("user",devUserName)
                                       .option("password",devPassword)
                                       .load()
                                       .collect()
                                       .map(row => row.getAs[Int]("count")).head
Run Code Online (Sandbox Code Playgroud)

我们获得了所提供谓词可以用作upperBount的返回行数。

val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                   .option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
                                   .option("user",devUserName)
                                   .option("password",devPassword)
                                   .option("numPartitions", 10)
                                   .option("partitionColumn", "RNO")
                                   .option("lowerBound", 1)
                                   .option("upperBound", rowCount)
                                   .load()
Run Code Online (Sandbox Code Playgroud)

numPartitions取决于与Postgres DB的并行连接数。您可以在读取数据库时根据所需的并行度进行调整。