我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表.为此,我想出了以下代码:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", …Run Code Online (Sandbox Code Playgroud) 最近我正在使用带有 JDBC 数据源的 Spark。考虑以下片段:
val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)
Run Code Online (Sandbox Code Playgroud)
PRED 是谓词列表。
如果 PRED 是一个简单的谓词,比如x = 10,查询会快得多。但是,如果有一些不相等的条件,例如date > someOtherDate or date < someOtherDate2,查询比没有谓词下推要慢得多。您可能知道,数据库引擎对此类谓词的扫描速度非常慢,在我的情况下甚至慢 10 倍(!)。
为了防止不必要的谓词下推,我使用了:
val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)
Run Code Online (Sandbox Code Playgroud)
但它需要大量内存并且 - 由于这里提到的问题 - Spark' Dataset unpersist 行为- 我无法 unpersist cachedDF。
还有其他选择可以避免下推谓词吗?没有缓存也没有编写自己的数据源?
注意:即使有关闭谓词下推的选项,它也仅适用于其他查询可能仍在使用它。所以,如果我写:
// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where …Run Code Online (Sandbox Code Playgroud) 我想我错过了什么,但无法弄清楚是什么.我想使用特定的sql语句使用SQLContext和JDBC加载数据
select top 1000 text from table1 with (nolock)
where threadid in (
select distinct id from table2 with (nolock)
where flag=2 and date >= '1/1/2015' and userid in (1, 2, 3)
)
Run Code Online (Sandbox Code Playgroud)
我应该使用哪种SQLContext方法?我看到的例子总是指定表名和下边距和上边距.
提前致谢.