spark jdbc df limit ...它在做什么?

MK.*_*MK. 3 apache-spark apache-spark-sql

我正在努力学习如何了解Spark内部的情况,这是我目前的困惑.我正在尝试将Oracle表中的前200行读入Spark:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "schema.table",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

jdbcDF.limit(200).count()
Run Code Online (Sandbox Code Playgroud)

我希望,这相当快.在具有500K行的表上的类似操作在合理的时间内完成.在这种特殊情况下,表格要大得多(数亿行),但是我认为限制(200)会使它变快吗?我该如何计算出花费时间的地方?

eli*_*sah 7

事实上,火花还没有能力limit推翻谓词.

所以实际上在这种情况下发生的事情是它将所有数据拉到火花然后限制和计数.您需要的是在子查询中将其用作表参数.

例如:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "(select * from schema.table limit 200) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()
Run Code Online (Sandbox Code Playgroud)

因此,主要是花费时间的地方是拉动所有数据.

您还可以在子查询中动态传递限制:

val n : Int = ???

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> s"(select * from schema.table limit $n) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()
Run Code Online (Sandbox Code Playgroud)

有一个JIRA机票(SPARK-10899) 正在进行中以解决这个问题,但它已经挂了将近一年.

编辑:由于上述JIRA中的问题被标记为重复.您可以在此处继续跟踪问题- SPARK-12126.我希望这能回答你的问题.

  • 这不是同一件事@y2k-shubham :) 我们正在谈论您在 spark 方面定义的谓词。 (2认同)