sim*_*yun 6 apache-spark apache-spark-sql
我有2个spark RDD,dataRDD和newPairDataRDD,它们用于spark SQL查询.当我的应用程序初始化时,dataRDD将被初始化.一个指定的hbase实体中的所有数据都将存储到dataRDD.
当客户端的SQL查询到来时,我的APP将获得所有新的更新并插入newPairDataRDD.dataRDD联合newPairDataRDD并在spark SQL上下文中注册为表.
我在dataRDD中找到了0条记录,在newPairDataRDD中找到了1条新的插入记录.工会需要4秒钟.那太慢了
我认为这是不合理的.谁知道怎么做得更快?谢谢简单的代码如下
// Step1: load all data from hbase to dataRDD when initial, this only run once.
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());
// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce
if(newPairDataRDD.count() > 0) {
JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
// if data was updated in DB, need to delete the old version from the dataRDD.
dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Run Code Online (Sandbox Code Playgroud)
从火花web ui,我可以在下面看到.显然它需要4s才能结合
完成阶段(8)
StageId描述提交的持续时间任务:成功/总输入随机读取随机写入
6收集于SparkPlan.scala:85 +详情2015年4月4日8:17 2 s 8-Aug 156.0 B.
7联盟SparkSqlQueryForMarsNew.java:389+details 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B
实现您想要的效果的更有效方法是使用 acogroup()和 a flatMapValues(),使用 union 除了向 中添加新分区外几乎没有什么作用dataRDD,这意味着所有数据必须在 之前进行混洗reduceByKey()。Acogroup()并且flatMapValues()只会导致newPairDataRDD.
JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
if (grouped._2.nonEmpty()) {
return grouped._2;
} else {
return grouped._1;
}
}
});
Run Code Online (Sandbox Code Playgroud)
或者在斯卡拉中
val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
if (newVals.nonEmpty) newVals else oldVals
}
Run Code Online (Sandbox Code Playgroud)
免责声明,我不习惯用Java编写spark!以上如有错误还请大家指正!
| 归档时间: |
|
| 查看次数: |
3594 次 |
| 最近记录: |