试图从源代码运行http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala.
这一行:
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
投掷错误
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(String, Int)]
val wordCounts = logData.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
logData.flatMap(line => line.split(" ")).map(word => (word, 1))返回MappedRDD,但我在http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD中找不到此类型
我从Spark源代码运行此代码,因此可能是类路径问题?但是必需的依赖项在我的类路径上.
我有一个简单的路线:
line = "Hello, world"
Run Code Online (Sandbox Code Playgroud)
我想将它转换为只有一个元素的RDD.我试过了
sc.parallelize(line)
Run Code Online (Sandbox Code Playgroud)
但它得到:
sc.parallelize(line).collect()
['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd']
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
假设df1并且df2是DataFrameApache Spark 中的两个,使用两种不同的机制计算,例如,Spark SQL与Scala/Java/Python API.
是否存在一种惯用的方法来确定两个数据帧是否相等(相等,同构),其中等价由数据确定(每行的列名和列值)是否相同,除了行和列的排序?
这个问题的动机是,通常有很多方法来计算一些大数据结果,每种方法都有自己的权衡.在探讨这些权衡时,重要的是要保持正确性,因此需要检查有意义的测试数据集的等效性/相等性.
rdd1.join(rdd2)如果rdd1并rdd2拥有相同的分区,会导致洗牌吗?
我有一个小型Scala程序,可以在单个节点上运行.但是,我正在扩展它,因此它在多个节点上运行.这是我的第一次尝试.我只是想了解RDD如何在Spark中工作,所以这个问题是基于理论的,可能不是100%正确.
假设我创建了一个RDD:
val rdd = sc.textFile(file)
现在,一旦我这样做了,这是否意味着文件at file现在在节点之间进行分区(假设所有节点都可以访问文件路径)?
其次,我想计算RDD中的对象数量(足够简单),但是,我需要在需要应用于RDD中的对象的计算中使用该数字 - 伪代码示例:
rdd.map(x => x / rdd.size)
Run Code Online (Sandbox Code Playgroud)
假设有100个对象rdd,并且说有10个节点,因此每个节点有10个对象的计数(假设这是RDD概念的工作方式),现在当我调用该方法时,每个节点将使用rdd.sizeas 执行计算10还是100?因为,总的来说,RDD是大小100但在每个节点上本地只是10.我是否需要在进行计算之前制作广播变量?这个问题与下面的问题有关.
最后,如果我转换到RDD,例如rdd.map(_.split("-")),然后我想要新size的RDD,我是否需要在RDD上执行操作,例如count(),所以所有信息都被发送回驱动程序节点?
我有以下火花工作,试图将一切都记在内存中:
val myOutRDD = myInRDD.flatMap { fp =>
val tuple2List: ListBuffer[(String, myClass)] = ListBuffer()
:
tuple2List
}.persist(StorageLevel.MEMORY_ONLY).reduceByKey { (p1, p2) =>
myMergeFunction(p1,p2)
}.persist(StorageLevel.MEMORY_ONLY)
Run Code Online (Sandbox Code Playgroud)
但是,当我查看作业跟踪器时,我仍然有很多Shuffle Write和Shuffle溢出到磁盘......
Total task time across all tasks: 49.1 h
Input Size / Records: 21.6 GB / 102123058
Shuffle write: 532.9 GB / 182440290
Shuffle spill (memory): 370.7 GB
Shuffle spill (disk): 15.4 GB
Run Code Online (Sandbox Code Playgroud)
然后这个工作失败了因为"no space left on device"......我想知道532.9 GB Shuffle写在这里,是写入磁盘还是内存?
另外,为什么还有15.4 G数据溢出到磁盘,而我特意要求将它们保存在内存中?
谢谢!
我们正在开发Spark框架,其中我们将历史数据移动到RDD集合中.
基本上,RDD是我们进行操作的不可变的只读数据集.基于此,我们已将历史数据移至RDD,并在此类RDD上进行过滤/映射等计算.
现在有一个用例,RDD中的数据子集得到更新,我们必须重新计算这些值.
HistoricalData采用RDD的形式.我根据请求范围创建另一个RDD,并在ScopeCollection中保存该RDD的引用
到目前为止,我已经能够想到以下方法 -
方法1:广播变化:
方法2:为更新创建RDD
方法3:
我曾想过创建流RDD,我不断更新相同的RDD并进行重新计算.但据我所知,它可以从Flume或Kafka获取流.而在我的情况下,值是基于用户交互在应用程序本身中生成的.因此,我无法在上下文中看到流RDD的任何集成点.
关于哪种方法更好或任何其他适合此方案的方法的任何建议.
TIA!
我正在尝试使用Apache Spark和Java执行矩阵乘法.
我有两个主要问题:
假设我有以下两个RDD,具有以下键对值.
rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]
Run Code Online (Sandbox Code Playgroud)
和
rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ]
Run Code Online (Sandbox Code Playgroud)
现在,我想通过键值加入它们,所以例如我想返回以下内容
ret = [ (key1, [value1, value2, value5, value6]), (key2, [value3, value4, value7]) ]
Run Code Online (Sandbox Code Playgroud)
我如何使用Python或Scala在spark中执行此操作?一种方法是使用join,但join会在元组内部创建一个元组.但我希望每个键值对只有一个元组.
我有一个RDD叫
JavaPairRDD<String, List<String>> existingRDD;
Run Code Online (Sandbox Code Playgroud)
现在我需要将其初始化 existingRDD为空,这样当我得到实际的rdd时,我可以用它做一个联合existingRDD.existingRDD除了将其初始化为null之外,如何初始化为空RDD?这是我的代码:
JavaPairRDD<String, List<String>> existingRDD;
if(ai.get()%10==0)
{
existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/",
NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten
}
else
{
existingRDD.union(rdd);
}
Run Code Online (Sandbox Code Playgroud)