Apache Spark pyspark.RDDAPI文档提到groupByKey()效率低下.相反,它是推荐使用reduceByKey(),aggregateByKey(),combineByKey(),或foldByKey()代替.这将导致在shuffle之前在worker中进行一些聚合,从而减少跨工作人员的数据混乱.
给定以下数据集和groupByKey()表达式,什么是等效且有效的实现(减少的跨工作者数据混洗),它不使用groupByKey(),但提供相同的结果?
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())
Run Code Online (Sandbox Code Playgroud)
输出:
[('a', [7, 8]), ('b', [3])]
Run Code Online (Sandbox Code Playgroud) 我使用以下代码创建表:
CREATE KEYSPACE mykeyspace
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE mykeyspace;
CREATE TABLE users (
user_id int PRIMARY KEY,
fname text,
lname text
);
INSERT INTO users (user_id, fname, lname)
VALUES (1745, 'john', 'smith');
INSERT INTO users (user_id, fname, lname)
VALUES (1744, 'john', 'doe');
INSERT INTO users (user_id, fname, lname)
VALUES (1746, 'john', 'smith');
Run Code Online (Sandbox Code Playgroud)
我想找到lname列的不同值(不是PRIMARY KEY).我想得到以下结果:
lname
-------
smith
Run Code Online (Sandbox Code Playgroud)
通过使用SELECT DISTINCT lname FROM users;
但是因为lname不是PRIMARY KEY我得到以下错误:
InvalidRequest: …Run Code Online (Sandbox Code Playgroud) 我有一个RDD,它太大而不能一致地执行一个不同的语句而没有虚假错误(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,最大执行程序失败次数,阶段因SparkContext关闭而被取消,等等)
我试图计算特定列中的不同ID,例如:
print(myRDD.map(a => a._2._1._2).distinct.count())
Run Code Online (Sandbox Code Playgroud)
是否有一种简单,一致,不太随机密集的方式来执行上面的命令,可能使用mapPartitions,reduceByKey,flatMap或其他使用较少shuffle而不是不同的命令?