D. *_*ler 7 dataset rdd apache-spark-dataset apache-spark-2.0
我想调整我的Java Spark应用程序(实际上使用RDD进行某些计算)来Datasets代替RDDs.我是数据集的新手,不知道如何将哪个事务映射到相应的数据集操作.
目前我将它们映射为:
JavaSparkContext.textFile(...) -> SQLContext.read().textFile(...)
JavaRDD.filter(Function) -> Dataset.filter(FilterFunction)
JavaRDD.map(Function) -> Dataset.map(MapFunction)
JavaRDD.mapToPair(PairFunction) -> Dataset.groupByKey(MapFunction) ???
JavaPairRDD.aggregateByKey(U, Function2, Function2) -> KeyValueGroupedDataset.???
Run Code Online (Sandbox Code Playgroud)
相应的问题是:
JavaRDD.mapToPair该Dataset.groupByKey方法?JavaPairRDD映射到KeyValueGroupedDataset?JavaPairRDD.aggregateByKey方法?但是,我想将以下RDD代码移植到数据集中:
JavaRDD<Article> goodRdd = ...
JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() { // Build PairRDD<<Date|Store|Transaction><Article>>
public Tuple2<String, Article> call(Article article) throws Exception {
String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter();
return new Tuple2<String, Article>(key, article);
}
});
JavaPairRDD<String, String> transactionRdd = ArticlePairRdd.aggregateByKey("", // Aggregate distributed data -> PairRDD<String, String>
new Function2<String, Article, String>() {
public String call(String oldString, Article newArticle) throws Exception {
String articleString = newArticle.getOwg() + "_" + newArticle.getTextOwg(); // <<Date|Store|Transaction><owg_textOwg###owg_textOwg>>
return oldString + "###" + articleString;
}
},
new Function2<String, String, String>() {
public String call(String a, String b) throws Exception {
String c = a.concat(b);
...
return c;
}
}
);
Run Code Online (Sandbox Code Playgroud)
我的代码看起来还是这样:
Dataset<Article> goodDS = ...
KeyValueGroupedDataset<String, Article> ArticlePairDS = goodDS.groupByKey(new MapFunction<Article, String>() {
public String call(Article article) throws Exception {
String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter();
return key;
}
}, Encoders.STRING());
// here I need something similar to aggregateByKey! Not reduceByKey as I need to return another data type (String) than I have before (Article)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
900 次 |
| 最近记录: |