r4r*_*008 5 apache-spark apache-spark-sql
我的数据集以这种方式分区:
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
Run Code Online (Sandbox Code Playgroud)
在两个日期之间加载数据的spark中创建数据框的最简单有效的方法是什么?
如果您必须坚持这种分区策略,答案取决于您是否愿意承担分区发现成本.
如果您愿意让Spark发现所有分区,只需要发生一次(直到您添加新文件),您可以加载基本路径,然后使用分区列进行过滤.
如果您不希望Spark发现所有分区,例如,因为您有数百万个文件,唯一有效的通用解决方案是将您要查询的间隔分成几个子区间,您可以使用@ r0bb23的方法轻松查询这些区间然后联合在一起.
如果您想要上述两种情况中的最佳情况并且您具有稳定的架构,则可以通过定义外部分区表来在Metastore中注册分区.如果您希望您的架构随着Metastore管理的表格在此时管理架构演变而发展,那么请不要这样做.
例如,要在2017-10-06
和之间进行查询2017-11-03
:
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
Run Code Online (Sandbox Code Playgroud)
为此编写通用代码当然是可能的,但我没有遇到过.更好的方法是按照我对问题所做的评论中概述的方式进行分区.如果您的表使用类似的东西进行分区,/basepath/ts=yyyymmddhhmm/*.parquet
那么答案很简单:
spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
Run Code Online (Sandbox Code Playgroud)
值得添加小时和分钟的原因是,您可以编写处理间隔的通用代码,无论您是按周,日,小时还是每15分钟划分数据.实际上,您甚至可以在同一个表中管理具有不同粒度的数据,例如,较旧的数据在较高级别聚合以减少需要发现的分区总数.
编辑添加多个加载路径到地址注释。
您可以使用正则表达式样式语法。
val dataset = spark
.read
.format("parquet")
.option("filterPushdown", "true")
.option("basePath", "hdfs:///basepath/")
.load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
"hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
Run Code Online (Sandbox Code Playgroud)
如何使用正则表达式在 sc.textFile 中包含/排除某些输入文件?
注意:如果你想要所有的日子、月份等,你不需要X=*
你可以做的*
。
您可能还应该阅读有关Predicate Pushdown 的内容(即上面将 filterPushdown 设置为 true)。
最后,您会注意到上面的 basepath 选项,其原因可以在这里找到:防止 DataFrame.partitionBy() 从架构中删除分区列
归档时间: |
|
查看次数: |
5388 次 |
最近记录: |