我正在尝试在迭代RDD的元素时向地图添加元素.我没有收到任何错误,但修改没有发生.
这一切都可以直接添加或迭代其他集合:
scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()
scala> myMap("test1")="test1"
scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)
scala> List("test2", "test3").foreach(w => myMap(w) = w)
scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
Run Code Online (Sandbox Code Playgroud)
但是当我尝试从RDD做同样的事情时:
scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)
scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
Run Code Online (Sandbox Code Playgroud)
我已经尝试在foreach之前打印地图的内容,以确保变量是相同的,并且它打印正确: …
为什么rdd.sample()Spark RDD 上的函数返回不同数量的元素,即使fraction参数相同?例如,如果我的代码如下所示:
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1).count
Run Code Online (Sandbox Code Playgroud)
每次我运行代码的第二行时,它返回一个不等于1000的不同数字.实际上我希望每次看到1000,尽管1000个元素可能不同.谁能告诉我如何获得样本大小恰好等于1000的样本?非常感谢你.
我在HDFS中有成千上万的小文件.需要处理稍小的文件子集(也是数千个),fileList包含需要处理的文件路径列表.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
Run Code Online (Sandbox Code Playgroud)
//一旦退出循环,执行任何操作都会导致由于RDD的长谱系导致的堆栈溢出错误
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) …Run Code Online (Sandbox Code Playgroud) 为什么Spark中的模式匹配与Scala中的模式匹配不一样?请参阅下面的示例...函数f()尝试在类上进行模式匹配,它在Scala REPL中工作但在Spark中失败并导致所有"???". f2()是一种解决方法,可以在Spark中获得所需的结果.isInstanceOf(),但我知道这是Scala中的错误形式.
任何帮助模式匹配Spark中这种情况下的正确方法将不胜感激.
abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a
case class bNull(a: Int=0) extends a
val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)
Run Code Online (Sandbox Code Playgroud)
尝试模式匹配在Scala REPL中工作但在Spark中失败
def f(x: a) = x match {
case b(n) => "b"
case bNull(n) => "bnull"
case _ => "???"
}
Run Code Online (Sandbox Code Playgroud)
在Spark中运行的解决方法,但是形式不好(我认为)
def f2(x: a) = {
if (x.isInstanceOf[b]) {
"b"
} else if (x.isInstanceOf[bNull]) {
"bnull"
} else …Run Code Online (Sandbox Code Playgroud) 我在HDFS上有一个文本文件,我想将它转换为Spark中的数据框.
我使用Spark Context加载文件,然后尝试从该文件生成单个列.
val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
Run Code Online (Sandbox Code Playgroud)
执行此操作后,我正在尝试以下操作.
myFile1.toDF()
Run Code Online (Sandbox Code Playgroud)
我遇到了问题,因为myFile1 RDD中的元素现在是数组类型.
我该如何解决这个问题?
我是一个Apache星火学习者和所遇到的一个RDD动作aggregate,我有没有它的功能如何线索.有人可以逐步详细解释并详细解释我们如何在此处得到以下代码的结果
RDD input = {1,2,3,3}
RDD Aggregate function :
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
output : {9,4}
Run Code Online (Sandbox Code Playgroud)
谢谢
我们都知道Spark会在内存中进行计算.我只是好奇以下.
如果我RDD从HDFS在我的pySpark shell中创建10 ,它是否意味着所有这些10 RDD秒数据将驻留在Spark Workers Memory上?
如果我不删除RDD,它会永远存在于内存中吗?
如果我的数据集(文件)大小超过可用的RAM大小,那么数据将存储在何处?
我有一个RDD结构
RDD[(String, String)]
Run Code Online (Sandbox Code Playgroud)
我想创建2个列表(rdd的每个维度一个).
我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists是空的.我该怎么做 ?
编辑:我的方法
val labeled = data_labeled.map { line =>
val parts = line.split(',')
(parts(5), parts(7))
}.cache()
var testList : ListBuffer[String] = new ListBuffer()
labeled.foreach(line =>
testList += line._1
)
val labeledList = testList.toList
println("rdd: " + labeled.count)
println("bufferList: " + testList.size)
println("list: " + labeledList.size)
Run Code Online (Sandbox Code Playgroud)
结果是:
rdd: 31990654
bufferList: 0
list: 0
Run Code Online (Sandbox Code Playgroud) 特别是,如果我说
rdd3 = rdd1.join(rdd2)
Run Code Online (Sandbox Code Playgroud)
然后当我打电话时rdd3.collect,根据Partitioner使用情况,要么在节点分区之间移动数据,要么在每个分区上本地完成连接(或者,就我所知,完全不同的东西).这取决于RDD论文所谓的"狭义"和"广泛"依赖关系,但谁知道优化器在实践中有多好.
无论如何,我可以从跟踪输出中收集实际发生的事情,但是打电话会很好rdd3.explain.
这样的事情存在吗?
我有一个非常大的pyspark.sql.dataframe.DataFrame,名为df.我需要一些枚举记录的方法 - 因此,能够访问具有特定索引的记录.(或选择索引范围的记录组)
在熊猫中,我可以做到
indexes=[2,3,6,7]
df[indexes]
Run Code Online (Sandbox Code Playgroud)
在这里,我想要类似的东西,(并且不将数据帧转换为pandas)
我能得到的最接近的是:
通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count())
df_indexed=df.withColumn('index', indexes)
Run Code Online (Sandbox Code Playgroud)
问题:
它会在以后工作,如下所示:
indexes=[2,3,6,7]
df1.where("index in indexes").collect()
Run Code Online (Sandbox Code Playgroud)有没有更快更简单的方法来处理它?