D. *_*ler 3 wildcard hdfs apache-spark
我想通过将巨大的csv文件细分为不同的分区来优化Spark应用程序的运行时,这取决于它们的特性.
例如,我有一个客户ID(整数,A),与日期的列(月+年,如01.2015,B)一列,并与产品ID列(整数,C)(多列有产品的具体数据,而不是分区所需的).
我想构建一个类似的文件夹结构/customer/a/date/b/product/c.当用户想要了解2016年1月出售的客户X的产品信息时,他可以加载并分析保存的文件/customer/X/date/01.2016/*.
是否有可能通过通配符加载此类文件夹结构?还应该可以加载特定时间范围的所有客户或产品,例如01.2015至2015年9月.是否可以使用通配符/customer/*/date/*.2015/product/c?或者如何解决这样的问题呢?
我想对数据进行一次分区,然后在分析中加载特定文件,以减少这些作业的运行时间(忽略分区的额外工作).
解决方案:使用Parquet文件
我更改了我的Spark应用程序以将我的数据保存到Parquet文件,现在一切正常,我可以通过给出文件夹结构来预先选择数据.这是我的代码片段:
JavaRDD<Article> goodRdd = ...
SQLContext sqlContext = new SQLContext(sc);
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
public Row call(Article article) throws Exception {
return RowFactory.create(article.getKeyStore(), article.getTextArticle());
}
});
DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// WRITE PARQUET FILES
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");
// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");
System.out.println("READ : " + read.count());
Run Code Online (Sandbox Code Playgroud)
重要
不要尝试只有一列的表!当您尝试调用该partitionBy方法时,您将获得例外!
Gle*_*olt 24
因此,在Spark中,您可以按照您要查找的方式保存和读取分区数据.但是,与/customer/a/date/b/product/c使用Spark 一样创建路径时,Spark将在使用以下方法/customer=a/date=b/product=c保存数据时使用此约定:
df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/")
Run Code Online (Sandbox Code Playgroud)
当您需要读入数据时,需要指定basepath-option如下:
sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/")
Run Code Online (Sandbox Code Playgroud)
以下内容/my/base/path/将被Spark解释为列.在这里给出的示例中,Spark将添加三列customer,date并添加product到数据帧.请注意,您可以根据需要对任何列使用通配符.
至于在特定时间范围内读取数据,您应该知道Spark使用谓词下推,因此它实际上只会将数据加载到符合条件的内存中(由某些过滤器转换指定).但是,如果您确实要明确指定范围,则可以生成路径名列表,然后将其传递给read函数.像这样:
val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*",
"/my/path/customer=*/date=02.2015/product=*",
"/my/path/customer=*/date=03.2015/product=*"...,
"/my/path/customer=*/date=09.2015/product=*")
sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*)
Run Code Online (Sandbox Code Playgroud)
无论如何,我希望这有助于:)
| 归档时间: |
|
| 查看次数: |
4267 次 |
| 最近记录: |