标签: rdd

使用Apache Spark将键值对减少为键列表对

我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)

发生这种情况时我得到的错误是:

'NoneType'对象没有attribue'追加'.

我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.

python mapreduce apache-spark rdd pyspark

42
推荐指数
4
解决办法
8万
查看次数

什么是火花中的RDD

定义说:

RDD是不可变的分布式对象集合

我不太明白这是什么意思.是否像存储在硬盘上的数据(分区对象)那么如何RDD可以拥有用户定义的类(如java,scala或python)

从这个链接:https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch03.html它提到:

用户以两种方式创建RDD:通过加载外部数据集,或通过在其驱动程序中分发对象集合(例如,列表或集合)

我很难理解RDD的一般情况以及与spark和hadoop的关系.

请有人帮忙.

hadoop scala apache-spark rdd

40
推荐指数
5
解决办法
3万
查看次数

使用scala连接Apache spark中不同RDD的数据集

有没有办法RDD在spark中连接两个不同s的数据集?

要求是 - 我使用具有相同列名的scala创建两个中间RDD,需要组合这两个RDD的结果并缓存访问UI的结果.如何在此处组合数据集?

RDD属于类型 spark.sql.SchemaRDD

scala distributed-computing apache-spark rdd apache-spark-sql

35
推荐指数
1
解决办法
4万
查看次数

如何将RDD拆分为两个或更多RDD?

我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.

如果您熟悉SAS,请执行以下操作:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;
Run Code Online (Sandbox Code Playgroud)

这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......

apache-spark rdd pyspark

35
推荐指数
2
解决办法
4万
查看次数

多个RDD的Spark联合

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)

我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)

是否有一个联合运算符可以让我一次操作多个rdds:

例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

这是一个方便的问题.

python apache-spark rdd pyspark

35
推荐指数
1
解决办法
6万
查看次数

如何在Scala Spark中对RDD进行排序?

读取Spark方法sortByKey:

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
Run Code Online (Sandbox Code Playgroud)

是否可以返回"N"个数量的结果.因此,不要返回所有结果,只返回前10位.我可以将已排序的集合转换为数组并使用take方法,但由于这是一个O(N)操作,是否有更有效的方法?

scala apache-spark rdd

34
推荐指数
3
解决办法
4万
查看次数

SparkContext,JavaSparkContext,SQLContext和SparkSession之间的区别?

  1. SparkContext, JavaSparkContext, SQLContext和之间有什么区别SparkSession
  2. 是否有任何方法可以使用SparkSession?转换或创建Context ?
  3. 我可以使用一个条目完全替换所有上下文SparkSession吗?
  4. 在所有的功能SQLContext,SparkContextJavaSparkContextSparkSession
  5. 有些功能parallelizeSparkContext和中有不同的行为JavaSparkContext.他们是如何表现的SparkSession
  6. 如何使用SparkSession?创建以下内容?

    • RDD
    • JavaRDD
    • JavaPairRDD
    • Dataset

有没有一种方法可以将a JavaPairRDD转换为a DatasetDataseta JavaPairRDD

java scala apache-spark rdd apache-spark-dataset

34
推荐指数
3
解决办法
1万
查看次数

如何在ipython中将Spark RDD转换为pandas数据帧?

我有一个RDD,我想将其转换为pandas dataframe.我知道要转换,我们可以做到RDD正常dataframe

df = rdd1.toDF()
Run Code Online (Sandbox Code Playgroud)

但我想转换RDDpandas dataframe而不是正常dataframe.我该怎么做?

python ipython pandas rdd pyspark

32
推荐指数
2
解决办法
7万
查看次数

Apache Spark:按键将RDD分成多个RDD以保存值

我使用Spark 1.0.1处理大量数据.每行包含一个ID号,一些包含重复的ID.我想在同一位置保存具有相同ID号的所有行,但我无法有效地执行此操作.我创建了(ID号,数据行)对的RDD [(String,String)]:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 
Run Code Online (Sandbox Code Playgroud)

一种有效但不具备性能的方法是收集ID号,过滤每个ID的RDD,并使用与文本文件相同的ID保存值的RDD.

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})
Run Code Online (Sandbox Code Playgroud)

我还尝试了groupByKey或reduceByKey,以便RDD中的每个元组包含一个唯一的ID号作为键,以及由该ID号的新行分隔的一组组合数据行.我想只使用foreach迭代RDD一次来保存数据,但是它不能将值作为RDD给出

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})
Run Code Online (Sandbox Code Playgroud)

基本上,我想通过ID号将RDD拆分为多个RDD,并将该ID号的值保存到它们自己的位置.

filter apache-spark rdd

31
推荐指数
1
解决办法
3万
查看次数

RDD中的分区数和Spark中的性能

在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:

sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)

我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?

performance apache-spark rdd pyspark

31
推荐指数
2
解决办法
3万
查看次数