标签: rdd

在Scala Spark中找不到reduceByKey方法

试图从源代码运行http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala.

这一行:

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

投掷错误

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(String, Int)]
  val wordCounts = logData.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

logData.flatMap(line => line.split(" ")).map(word => (word, 1))返回MappedRDD,但我在http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD中找不到此类型

我从Spark源代码运行此代码,因此可能是类路径问题?但是必需的依赖项在我的类路径上.

scala apache-spark rdd

24
推荐指数
1
解决办法
1万
查看次数

在Spark中将简单的一行字符串转换为RDD

我有一个简单的路线:

line = "Hello, world"
Run Code Online (Sandbox Code Playgroud)

我想将它转换为只有一个元素的RDD.我试过了

sc.parallelize(line)
Run Code Online (Sandbox Code Playgroud)

但它得到:

sc.parallelize(line).collect()
['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd']
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

python distributed-computing apache-spark rdd pyspark

23
推荐指数
1
解决办法
4万
查看次数

Apache Spark中的DataFrame相等

假设df1并且df2DataFrameApache Spark 中的两个,使用两种不同的机制计算,例如,Spark SQL与Scala/Java/Python API.

是否存在一种惯用的方法来确定两个数据帧是否相等(相等,同构),其中等价由数据确定(每行的列名和列值)是否相同,除了行和列的排序?

这个问题的动机是,通常有很多方法来计算一些大数据结果,每种方法都有自己的权衡.在探讨这些权衡时,重要的是要保持正确性,因此需要检查有意义的测试数据集的等效性/相等性.

scala dataframe apache-spark rdd apache-spark-sql

23
推荐指数
7
解决办法
2万
查看次数

联合分区RDD的连接是否会导致Apache Spark的混乱?

rdd1.join(rdd2)如果rdd1rdd2拥有相同的分区,会导致洗牌吗?

apache-spark spark-streaming rdd

22
推荐指数
1
解决办法
1万
查看次数

Spark RDD - 它们是如何工作的

我有一个小型Scala程序,可以在单个节点上运行.但是,我正在扩展它,因此它在多个节点上运行.这是我的第一次尝试.我只是想了解RDD如何在Spark中工作,所以这个问题是基于理论的,可能不是100%正确.

假设我创建了一个RDD: val rdd = sc.textFile(file)

现在,一旦我这样做了,这是否意味着文件at file现在在节点之间进行分区(假设所有节点都可以访问文件路径)?

其次,我想计算RDD中的对象数量(足够简单),但是,我需要在需要应用于RDD中的对象的计算中使用该数字 - 伪代码示例:

rdd.map(x => x / rdd.size)
Run Code Online (Sandbox Code Playgroud)

假设有100个对象rdd,并且说有10个节点,因此每个节点有10个对象的计数(假设这是RDD概念的工作方式),现在当我调用该方法时,每个节点将使用rdd.sizeas 执行计算10还是100?因为,总的来说,RDD是大小100但在每个节点上本地只是10.我是否需要在进行计算之前制作广播变量?这个问题与下面的问题有关.

最后,如果我转换到RDD,例如rdd.map(_.split("-")),然后我想要新size的RDD,我是否需要在RDD上执行操作,例如count(),所以所有信息都被发送回驱动程序节点?

scala distributed-computing bigdata apache-spark rdd

21
推荐指数
2
解决办法
4777
查看次数

Spark:Shuffle Write,Shuffle spill(内存),Shuffle spill(磁盘)之间的区别?

我有以下火花工作,试图将一切都记在内存中:

val myOutRDD = myInRDD.flatMap { fp =>
  val tuple2List: ListBuffer[(String, myClass)] = ListBuffer()
        :

  tuple2List
}.persist(StorageLevel.MEMORY_ONLY).reduceByKey { (p1, p2) =>
   myMergeFunction(p1,p2)
}.persist(StorageLevel.MEMORY_ONLY)
Run Code Online (Sandbox Code Playgroud)

但是,当我查看作业跟踪器时,我仍然有很多Shuffle Write和Shuffle溢出到磁盘......

Total task time across all tasks: 49.1 h
Input Size / Records: 21.6 GB / 102123058
Shuffle write: 532.9 GB / 182440290
Shuffle spill (memory): 370.7 GB
Shuffle spill (disk): 15.4 GB
Run Code Online (Sandbox Code Playgroud)

然后这个工作失败了因为"no space left on device"......我想知道532.9 GB Shuffle写在这里,是写入磁盘还是内存?

另外,为什么还有15.4 G数据溢出到磁盘,而我特意要求将它们保存在内存中?

谢谢!

shuffle persist apache-spark rdd

21
推荐指数
3
解决办法
2万
查看次数

如何更新RDD?

我们正在开发Spark框架,其中我们将历史数据移动到RDD集合中.

基本上,RDD是我们进行操作的不可变的只读数据集.基于此,我们已将历史数据移至RDD,并在此类RDD上进行过滤/映射等计算.

现在有一个用例,RDD中的数据子集得到更新,我们必须重新计算这些值.

HistoricalData采用RDD的形式.我根据请求范围创建另一个RDD,并在ScopeCollection中保存该RDD的引用

到目前为止,我已经能够想到以下方法 -

方法1:广播变化:

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在工作中,在该RDD上应用地图阶段 -

    2.a. 对于RDD中的每个节点,在广播上进行查找并创建一个现在更新的新值,从而创建一个新的RDD
    2.b. 现在我在step2.a上再次对这个新的RDD进行所有计算.像乘法,减少等
    2.c. 我将此RDDs引用保存在我的ScopeCollection中

方法2:为更新创建RDD

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在每个RDD上,使用具有更改的新RDD进行连接
  3. 现在我在步骤2中再次对这个新的RDD进行所有计算,如乘法,减少等

方法3:

我曾想过创建流RDD,我不断更新相同的RDD并进行重新计算.但据我所知,它可以从Flume或Kafka获取流.而在我的情况下,值是基于用户交互在应用程序本身中生成的.因此,我无法在上下文中看到流RDD的任何集成点.

关于哪种方法更好或任何其他适合此方案的方法的任何建议.

TIA!

apache-spark spark-streaming rdd

19
推荐指数
1
解决办法
5260
查看次数

Apache Spark中的矩阵乘法

我正在尝试使用Apache Spark和Java执行矩阵乘法.

我有两个主要问题:

  1. 如何创建可以代表Apache Spark中的矩阵的RDD?
  2. 如何将两个这样的RDD相乘?

java scala apache-spark rdd apache-spark-mllib

19
推荐指数
1
解决办法
1万
查看次数

spark中的哪个函数用于按键组合两个RDD

假设我有以下两个RDD,具有以下键对值.

rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]
Run Code Online (Sandbox Code Playgroud)

rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ]
Run Code Online (Sandbox Code Playgroud)

现在,我想通过键值加入它们,所以例如我想返回以下内容

ret = [ (key1, [value1, value2, value5, value6]), (key2, [value3, value4, value7]) ] 
Run Code Online (Sandbox Code Playgroud)

我如何使用Python或Scala在spark中执行此操作?一种方法是使用join,但join会在元组内部创建一个元组.但我希望每个键值对只有一个元组.

python scala bigdata apache-spark rdd

18
推荐指数
2
解决办法
3万
查看次数

将RDD初始化为空

我有一个RDD叫

JavaPairRDD<String, List<String>> existingRDD; 
Run Code Online (Sandbox Code Playgroud)

现在我需要将其初始化 existingRDD为空,这样当我得到实际的rdd时,我可以用它做一个联合existingRDD.existingRDD除了将其初始化为null之外,如何初始化为空RDD?这是我的代码:

JavaPairRDD<String, List<String>> existingRDD;
if(ai.get()%10==0)
{
    existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/",
    NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten                                  
}
else
{
    existingRDD.union(rdd);
}
Run Code Online (Sandbox Code Playgroud)

java apache-spark rdd

18
推荐指数
1
解决办法
3万
查看次数