我想使用lambda函数来计算(JavaPairRDD<Integer, Double> pairs)的键的平均值.出于这个原因,我开发了以下代码:
java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc = x -> new Tuple2<Double, Integer>(x, 1);
BiFunction<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>> addAndCount = (Tuple2<Double, Integer> x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); };
BiFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>> combine = (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); };
JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine);
Run Code Online (Sandbox Code Playgroud)
但是,eclipse显示了这个错误:
The method combineByKey(Function<Double,C>, Function2<C,Double,C>, Function2<C,C,C>) in the type JavaPairRDD<Integer,Double> …Run Code Online (Sandbox Code Playgroud) 我想使用服务器端数据选择和使用cassandraspark连接器进行过滤.事实上,我们有许多传感器每1秒发送一次值,我们对使用数月,日,小时等的这些数据聚合感兴趣,我提出了以下数据模型:
CREATE TABLE project1(
year int,
month int,
load_balancer int,
day int,
hour int,
estimation_time timestamp,
sensor_id int,
value double,
...
PRIMARY KEY ((year, month, load_balancer), day, hour, estimation_time, sensor_id)
Run Code Online (Sandbox Code Playgroud)
然后,我们有兴趣获得2014年12月的负载均衡器IN(0,1,2,3)的数据汇总.所以它们是4个不同的分区.
我们使用的是cassandraspark连接器版本1.1.1,我们使用了一个按查询组合来获取所有值的平均值按小时汇总.
因此处理时间为4,341,390个元组,spark需要11分钟才能返回结果.现在的问题是我们正在使用5个节点,但是spark 只使用一个worker来执行任务.您能否建议更新查询或数据模型以提高性能?