我想弄清楚为什么我的groupByKey返回以下内容:
[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
Run Code Online (Sandbox Code Playgroud)
我有flatMapped值,如下所示:
[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
Run Code Online (Sandbox Code Playgroud)
我做的很简单:
groupRDD = columnRDD.groupByKey()
Run Code Online (Sandbox Code Playgroud) 我有以下数据,我想做的是
[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]
Run Code Online (Sandbox Code Playgroud)
是每个键计数值的实例(1个字符串字符)。所以我首先做了一张地图:
.map(lambda x: (x[0], [x[1], 1]))
Run Code Online (Sandbox Code Playgroud)
现在使其成为以下项的键/元组:
[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', …
Run Code Online (Sandbox Code Playgroud) 如果我有一个密钥/值的RDD(密钥是列索引),是否可以将其加载到数据帧中?例如:
(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)
Run Code Online (Sandbox Code Playgroud)
并使数据框看起来像:
1,2,18
1,10,18
2,20,18
Run Code Online (Sandbox Code Playgroud) 我还没有找到关于这个主题的更多信息,但是我们假设我们使用数据框来读取镶嵌文件中的10块火花自然会创建10个分区.但是当数据帧读入文件以处理它时,它不会处理大数据与分区的比率,因为如果它正在处理未压缩的文件,则块大小会大得多,这意味着分区也会更大.
所以让我澄清一下,压缩木地板(这些数字并不完全准确).1GB Par = 5 Blocks = 5个可以解压缩到5GB的分区,使其成为25个块/ 25个分区.但是,除非你重新分区1GB par文件,否则最好只有5个分区,它将是25个分区?或者我的逻辑错了.
重新分配以增加速度是否有意义?或者我在想这个错误.任何人都可以对此有所了解吗?
假设:
所以我有一个带有16个内核和64GB内存的Spark独立服务器.我在服务器上运行主服务器和工作服务器.我没有启用动态分配.我在Spark 2.0上
我不明白的是,当我提交工作并指明:
--num-executors 2
--executor-cores 2
Run Code Online (Sandbox Code Playgroud)
只应占用4个核心.然而,当提交作业时,它会占用所有16个内核,并且无论如何都会绕过num-executors
参数旋转8个执行程序.但如果我将executor-cores
参数更改为4
它将相应调整,4个执行器将旋转.
我尝试了一些不同的场景来尝试使用Spark的1.3 DataFrame来处理像sciPy kurtosis或numpy std这样的东西.这是示例代码,但它只挂在10x10数据集(10行,10列).我试过了:
print df.groupBy().agg(kurtosis(df.offer_id)).collect()
print df.agg(kurtosis(df.offer_ID)).collect()
Run Code Online (Sandbox Code Playgroud)
但这没有问题:
print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect()
Run Code Online (Sandbox Code Playgroud)
我的猜测是因为F是:from pyspark.sql import functions as F
是一个编程的sql函数.我如何使用数据帧来处理数据集上的峰度?
这也只是挂起:
print df.map(kurtosis(df.offer_id)).collect()
Run Code Online (Sandbox Code Playgroud) 我想动态生成我的数据框架构我有以下错误:
assert isinstance(dataType, DataType), "dataType should be DataType"
AssertionError: dataType should be DataType
Run Code Online (Sandbox Code Playgroud)
代码:
filteredSchema = []
for line in correctSchema:
fieldName = line.split(',')
if fieldName[1] == "decimal":
filteredSchema.append([fieldName[0], "DecimalType()"])
elif fieldName[1] == "string":
filteredSchema.append([fieldName[0], "StringType()"])
elif fieldName[1] == "integer":
filteredSchema.append([fieldName[0], "IntegerType()"])
elif fieldName[1] == "date":
filteredSchema.append([fieldName[0], "DateType()"])
sample1 = [(line[0], line[1], True) for line in filteredSchema]
print sample1
fields = [StructField(line[0], line[1], True) for line in filteredSchema]
Run Code Online (Sandbox Code Playgroud)
如果我使用这个:
fields = [StructField(line[0], StringType(), True) for line in filteredSchema]
Run Code Online (Sandbox Code Playgroud)
有用,
但 …
我正在尝试附加到数组,但是由于某种原因,它只是将空白附加到我的数组中。
def schemaClean(x: Array[String]): Array[String] =
{
val array = Array[String]()
for(i <- 0 until x.length){
val convert = x(i).toString
val split = convert.split('|')
if (split.length == 5) {
val drop = split.dropRight(3).mkString(" ")
array :+ drop
}
else if (split.length == 4) {
val drop = split.dropRight(2).mkString(" ")
println(drop)
array :+ drop.toString
println(array.mkString(" "))
}
}
array
}
val schema1 = schemaClean(schema)
Run Code Online (Sandbox Code Playgroud)
打印此:
record_id string
assigned_offer_id string
accepted_offer_flag string
current_offer_flag string
Run Code Online (Sandbox Code Playgroud)
如果我尝试打印schema1,则仅打印1个空白行。
我在Key Value配对中有数据.我试图将过滤函数应用于看起来像这样的数据:
def filterNum(x: Int) : Boolean = {
if (decimalArr.contains(x)) return true
else return false
}
Run Code Online (Sandbox Code Playgroud)
我的Spark代码有:
val numRDD = columnRDD.filter(x => filterNum(x(0)))
Run Code Online (Sandbox Code Playgroud)
但那不会工作,当我发送:
val numRDD = columnRDD.filter(x => filterNum(x))
Run Code Online (Sandbox Code Playgroud)
我收到错误:
<console>:23: error: type mismatch;
found : (Int, String)
required: Int
val numRDD = columnRDD.filter(x => filterNum(x))
Run Code Online (Sandbox Code Playgroud)
我也试图做其他事情,比如改变函数的输入
我稍微看了下面的帖子:了解Spark中的TreeReduce
我仍然试图准确地理解何时使用treeReduce与reduceByKey.我想我们可以使用像字数一样的通用示例来帮助我进一步了解正在发生的事情.
编辑:所以玩spark-shell,我认为我从根本上不了解treeReduce的概念,但希望一个例子和那些问题有帮助.
res2: Array[(String, Int)] = Array((D,1), (18964,1), (D,1), (1,1), ("",1), ("",1), ("",1), ("",1), ("",1), (1,1))
scala> val reduce = input.reduceByKey(_+_)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:25
scala> val tree = input.treeReduce(_+_, 2)
<console>:25: error: type mismatch;
found : (String, Int)
required: String
val tree = input.treeReduce(_+_, 2)
Run Code Online (Sandbox Code Playgroud)