Met*_*ata 9 hive partitioning jdbc apache-spark apache-spark-sql
我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表.为此,我想出了以下代码:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","cast_id")
.option("lowerBound", 1).option("upperBound", 100000)
.option("numPartitions",70).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
val dataDFPart = dataDF.repartition(30)
dataDFPart.createOrReplaceTempView("preparedDF")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
Run Code Online (Sandbox Code Playgroud)
数据被插入到基于动态分区的hive表中 prtn_String_columns: source_system_name, period_year, period_num
使用Spark-submit:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar
Run Code Online (Sandbox Code Playgroud)
执行程序日志中生成以下错误消息:
Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
at java.util.jar.JarFile.getManifest(JarFile.java:180)
at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我在日志中看到正在使用给定数量的分区正确执行读取,如下所示:
Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]
Run Code Online (Sandbox Code Playgroud)
数据未正确分区.一个分区较小而另一个分区变大.这里存在偏差问题.在将数据插入Hive表时,作业在该行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")但我知道这是因为数据偏差问题而发生的.
我试图增加执行程序的数量,增加执行程序内存,驱动程序内存,试图保存为csv文件,而不是将数据帧保存到Hive表中,但没有任何因为给出异常而影响执行:
java.lang.OutOfMemoryError: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)
我需要纠正代码中的任何内容吗?任何人都可以告诉我如何解决这个问题?
给定输入数据量和群集资源,确定需要多少个分区。根据经验,除非绝对必要,否则最好将分区输入保持在1GB以下。并且严格小于块大小限制。
您之前已经声明过,迁移用于不同帖子(5-70)的1TB数据值很可能会变低,以确保过程顺利进行。
尝试使用不需要进一步的价值repartitioning。
了解您的数据。
分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中。这些是导入过程的良好候选者。此外,您应该确定一个确切的值范围。
具有不同集中度和偏度度量的聚合以及直方图和按键的基本计数是很好的探索工具。对于此部分,最好直接在数据库中分析数据,而不是将其提取到Spark。
根据不同的RDBMS您可能能够使用width_bucket(PostgreSQL的,甲骨文)或同等功能得到一个体面的想法是如何的数据将分布在星火负荷后partitionColumn,lowerBound,upperBound,numPartitons。
s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
FROM t
GROUP BY bucket) as tmp)"""
Run Code Online (Sandbox Code Playgroud)如果没有满足上述条件的列,请考虑:
DBMS_CRYPTO在Oracle中,pgcrypto在PostgreSQL中)*。使用一组独立的列,它们在一起提供了足够高的基数。
(可选)如果要写入分区的Hive表,则应考虑包括Hive分区列。它可能会限制以后生成的文件数量。
准备分区参数
如果在前面的步骤中选择或创建的列是数字(或Spark> = 2.4中的日期/时间戳),则直接将其提供为,partitionColumn并使用在填充lowerBound和之前确定的范围值upperBound。
如果绑定值不能反映数据的属性(min(col)for lowerBound,max(col)for upperBound),则可能会导致明显的数据偏斜,因此请谨慎处理线程。在最坏的情况下,当边界不覆盖数据范围时,所有记录将由单台计算机获取,这使其根本不比没有分区好。
如果在前面的步骤中选择的列是分类列,或者是一组列,则将生成可以完全覆盖数据的互斥谓词列表,其形式可以在SQLwhere子句中使用。
例如,如果你有一列A与值{ a1,a2,a3}和列B与值{ b1,b2,b3}:
val predicates = for {
a <- Seq("a1", "a2", "a3")
b <- Seq("b1", "b2", "b3")
} yield s"A = $a AND B = $b"
Run Code Online (Sandbox Code Playgroud)
仔细检查条件是否重叠,是否涵盖所有组合。如果不满足这些条件,则最终将导致重复或缺少记录。
将数据作为predicates参数传递给jdbc调用。请注意,分区的数量将完全等于谓词的数量。
将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致。如果可能,应在开始整个过程之前锁定数据库,但如果不能,则在组织中)。
如果分区的数量与所需的输出负载数据相匹配,而没有repartition并将其直接转储到接收器,则如果不满足,您可以尝试按照与步骤1中相同的规则进行重新分区。
如果仍然遇到任何问题,请确保已正确配置了Spark内存和GC选项。
如果以上都不起作用:
考虑将数据转储到网络/使用类似的工具分配存储COPY TO并直接从那里读取。
注意,使用标准的POS或标准数据库实用程序通常需要兼容POSIX的文件系统,因此HDFS通常不需要。
这种方法的优点是您不必担心列属性,也无需将数据置于只读模式即可确保一致性。
使用专用的批量传输工具(例如Apache Sqoop),然后重塑数据。
* 不要使用伪列-Spark JDBC中的伪列。
| 归档时间: |
|
| 查看次数: |
2399 次 |
| 最近记录: |