我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)
发生这种情况时我得到的错误是:
'NoneType'对象没有attribue'追加'.
我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.
定义说:
RDD是不可变的分布式对象集合
我不太明白这是什么意思.是否像存储在硬盘上的数据(分区对象)那么如何RDD可以拥有用户定义的类(如java,scala或python)
从这个链接:https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch03.html它提到:
用户以两种方式创建RDD:通过加载外部数据集,或通过在其驱动程序中分发对象集合(例如,列表或集合)
我很难理解RDD的一般情况以及与spark和hadoop的关系.
请有人帮忙.
有没有办法RDD在spark中连接两个不同s的数据集?
要求是 - 我使用具有相同列名的scala创建两个中间RDD,需要组合这两个RDD的结果并缓存访问UI的结果.如何在此处组合数据集?
RDD属于类型 spark.sql.SchemaRDD
scala distributed-computing apache-spark rdd apache-spark-sql
我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.
如果您熟悉SAS,请执行以下操作:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
Run Code Online (Sandbox Code Playgroud)
这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......
在我的猪代码中,我这样做:
all_combined = Union relation1, relation2,
relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)
我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:
first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)
是否有一个联合运算符可以让我一次操作多个rdds:
例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)
这是一个方便的问题.
读取Spark方法sortByKey:
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
Run Code Online (Sandbox Code Playgroud)
是否可以返回"N"个数量的结果.因此,不要返回所有结果,只返回前10位.我可以将已排序的集合转换为数组并使用take方法,但由于这是一个O(N)操作,是否有更有效的方法?
SparkContext, JavaSparkContext, SQLContext和之间有什么区别SparkSession?SparkSession?转换或创建Context ?SparkSession吗?SQLContext,SparkContext和JavaSparkContext也SparkSession?parallelize在SparkContext和中有不同的行为JavaSparkContext.他们是如何表现的SparkSession?如何使用SparkSession?创建以下内容?
RDDJavaRDDJavaPairRDDDataset有没有一种方法可以将a JavaPairRDD转换为a Dataset或Dataseta JavaPairRDD?
我有一个RDD,我想将其转换为pandas dataframe.我知道要转换,我们可以做到RDD正常dataframe
df = rdd1.toDF()
Run Code Online (Sandbox Code Playgroud)
但我想转换RDD为pandas dataframe而不是正常dataframe.我该怎么做?
我使用Spark 1.0.1处理大量数据.每行包含一个ID号,一些包含重复的ID.我想在同一位置保存具有相同ID号的所有行,但我无法有效地执行此操作.我创建了(ID号,数据行)对的RDD [(String,String)]:
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
Run Code Online (Sandbox Code Playgroud)
一种有效但不具备性能的方法是收集ID号,过滤每个ID的RDD,并使用与文本文件相同的ID保存值的RDD.
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
Run Code Online (Sandbox Code Playgroud)
我还尝试了groupByKey或reduceByKey,以便RDD中的每个元组包含一个唯一的ID号作为键,以及由该ID号的新行分隔的一组组合数据行.我想只使用foreach迭代RDD一次来保存数据,但是它不能将值作为RDD给出
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
Run Code Online (Sandbox Code Playgroud)
基本上,我想通过ID号将RDD拆分为多个RDD,并将该ID号的值保存到它们自己的位置.
在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:
sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)
我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?