相关疑难解决方法(0)

从JDBC源迁移数据时如何优化分区?

我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表.为此,我想出了以下代码:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", …
Run Code Online (Sandbox Code Playgroud)

hive partitioning jdbc apache-spark apache-spark-sql

9
推荐指数
1
解决办法
2399
查看次数

如何防止谓词下推?

最近我正在使用带有 JDBC 数据源的 Spark。考虑以下片段:

val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)
Run Code Online (Sandbox Code Playgroud)

PRED 是谓词列表。

如果 PRED 是一个简单的谓词,比如x = 10,查询会快得多。但是,如果有一些不相等的条件,例如date > someOtherDate or date < someOtherDate2,查询比没有谓词下推要慢得多。您可能知道,数据库引擎对此类谓词的扫描速度非常慢,在我的情况下甚至慢 10 倍(!)。

为了防止不必要的谓词下推,我使用了:

val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)
Run Code Online (Sandbox Code Playgroud)

但它需要大量内存并且 - 由于这里提到的问题 - Spark' Dataset unpersist 行为- 我无法 unpersist cachedDF

还有其他选择可以避免下推谓词吗?没有缓存也没有编写自己的数据源?

注意:即使有关闭谓词下推的选项,它也仅适用于其他查询可能仍在使用它。所以,如果我写:

// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1435
查看次数

Spark SQL - 使用SQL语句而不是表名来使用JDBC加载数据

我想我错过了什么,但无法弄清楚是什么.我想使用特定的sql语句使用SQLContext和JDBC加载数据

select top 1000 text from table1 with (nolock)
where threadid in (
  select distinct id from table2 with (nolock)
  where flag=2 and date >= '1/1/2015' and  userid in (1, 2, 3)
)
Run Code Online (Sandbox Code Playgroud)

我应该使用哪种SQLContext方法?我看到的例子总是指定表名和下边距和上边距.

提前致谢.

apache-spark apache-spark-sql

2
推荐指数
1
解决办法
2845
查看次数