小编the*_*ing的帖子

PySpark groupByKey返回pyspark.resultiterable.ResultIterable

我想弄清楚为什么我的groupByKey返回以下内容:

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
Run Code Online (Sandbox Code Playgroud)

我有flatMapped值,如下所示:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
Run Code Online (Sandbox Code Playgroud)

我做的很简单:

groupRDD = columnRDD.groupByKey()
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

46
推荐指数
2
解决办法
5万
查看次数

PySpark reduceByKey?添加键/元组

我有以下数据,我想做的是

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]
Run Code Online (Sandbox Code Playgroud)

是每个键计数值的实例(1个字符串字符)。所以我首先做了一张地图:

.map(lambda x: (x[0], [x[1], 1]))
Run Code Online (Sandbox Code Playgroud)

现在使其成为以下项的键/元组:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

8
推荐指数
2
解决办法
2万
查看次数

pySpark使用键/值从RDD创建DataFrame

如果我有一个密钥/值的RDD(密钥是列索引),是否可以将其加载到数据帧中?例如:

(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)
Run Code Online (Sandbox Code Playgroud)

并使数据框看起来像:

1,2,18
1,10,18
2,20,18
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

7
推荐指数
1
解决办法
2万
查看次数

带有镶木地板和分区的Spark DataFrames

我还没有找到关于这个主题的更多信息,但是我们假设我们使用数据框来读取镶嵌文件中的10块火花自然会创建10个分区.但是当数据帧读入文件以处理它时,它不会处理大数据与分区的比率,因为如果它正在处理未压​​缩的文件,则块大小会大得多,这意味着分区也会更大.

所以让我澄清一下,压缩木地板(这些数字并不完全准确).1GB Par = 5 Blocks = 5个可以解压缩到5GB的分区,使其成为25个块/ 25个分区.但是,除非你重新分区1GB par文件,否则最好只有5个分区,它将是25个分区?或者我的逻辑错了.

重新分配以增加速度是否有意义?或者我在想这个错误.任何人都可以对此有所了解吗?

假设:

  • 1 Block = 1 Spark的分区
  • 1核心在1分区上运行

apache-spark parquet apache-spark-sql

7
推荐指数
1
解决办法
7589
查看次数

Spark独立编号执行器/内核控件

所以我有一个带有16个内核和64GB内存的Spark独立服务器.我在服务器上运行主服务器和工作服务器.我没有启用动态分配.我在Spark 2.0上

我不明白的是,当我提交工作并指明:

--num-executors 2
--executor-cores 2 
Run Code Online (Sandbox Code Playgroud)

只应占用4个核心.然而,当提交作业时,它会占用所有16​​个内核,并且无论如何都会绕过num-executors参数旋转8个执行程序.但如果我将executor-cores参数更改为4它将相应调整,4个执行器将旋转.

apache-spark apache-spark-standalone

7
推荐指数
1
解决办法
5301
查看次数

pySpark DataFrames与SciPy的聚合函数

我尝试了一些不同的场景来尝试使用Spark的1.3 DataFrame来处理像sciPy kurtosis或numpy std这样的东西.这是示例代码,但它只挂在10x10数据集(10行,10列).我试过了:

print df.groupBy().agg(kurtosis(df.offer_id)).collect()

print df.agg(kurtosis(df.offer_ID)).collect()
Run Code Online (Sandbox Code Playgroud)

但这没有问题:

print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect()
Run Code Online (Sandbox Code Playgroud)

我的猜测是因为F是:from pyspark.sql import functions as F是一个编程的sql函数.我如何使用数据帧来处理数据集上的峰度?

这也只是挂起:

print df.map(kurtosis(df.offer_id)).collect()
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark pyspark

5
推荐指数
1
解决办法
1601
查看次数

pySpark Data Frames "assert isinstance(dataType, DataType), "dataType 应该是 DataType"

我想动态生成我的数据框架构我有以下错误:

   assert isinstance(dataType, DataType), "dataType should be DataType"
AssertionError: dataType should be DataType
Run Code Online (Sandbox Code Playgroud)

代码:

filteredSchema = []
for line in correctSchema:
    fieldName = line.split(',')
    if fieldName[1] == "decimal":
        filteredSchema.append([fieldName[0], "DecimalType()"])
    elif fieldName[1] == "string":
        filteredSchema.append([fieldName[0], "StringType()"])
    elif fieldName[1] == "integer":
        filteredSchema.append([fieldName[0], "IntegerType()"])
    elif fieldName[1] == "date":
        filteredSchema.append([fieldName[0], "DateType()"])


sample1 = [(line[0], line[1], True) for line in filteredSchema]
print sample1

fields = [StructField(line[0], line[1], True) for line in filteredSchema]
Run Code Online (Sandbox Code Playgroud)

如果我使用这个:

fields = [StructField(line[0], StringType(), True) for line in filteredSchema]
Run Code Online (Sandbox Code Playgroud)

有用,

但 …

dataframe apache-spark pyspark

3
推荐指数
1
解决办法
9037
查看次数

Scala附加到空数组

我正在尝试附加到数组,但是由于某种原因,它只是将空白附加到我的数组中。

  def schemaClean(x: Array[String]): Array[String] =
  {
    val array = Array[String]()
    for(i <- 0 until x.length){
      val convert = x(i).toString
      val split = convert.split('|')
      if (split.length == 5) {
        val drop = split.dropRight(3).mkString(" ")
        array :+ drop
      }
      else if (split.length == 4) {
        val drop = split.dropRight(2).mkString(" ")
        println(drop)
        array :+ drop.toString
        println(array.mkString(" "))
      }
    }
   array
  }


  val schema1 = schemaClean(schema)
Run Code Online (Sandbox Code Playgroud)

打印此:

record_id string

assigned_offer_id string

accepted_offer_flag string

current_offer_flag string
Run Code Online (Sandbox Code Playgroud)

如果我尝试打印schema1,则仅打印1个空白行。

scala

3
推荐指数
1
解决办法
5697
查看次数

Spark键/值过滤器功能

我在Key Value配对中有数据.我试图将过滤函数应用于看起来像这样的数据:

  def filterNum(x: Int) : Boolean = {
    if (decimalArr.contains(x)) return true
    else return false
  }
Run Code Online (Sandbox Code Playgroud)

我的Spark代码有:

val numRDD = columnRDD.filter(x => filterNum(x(0)))
Run Code Online (Sandbox Code Playgroud)

但那不会工作,当我发送:

val numRDD = columnRDD.filter(x => filterNum(x))
Run Code Online (Sandbox Code Playgroud)

我收到错误:

<console>:23: error: type mismatch;
 found   : (Int, String)
 required: Int
       val numRDD = columnRDD.filter(x => filterNum(x))
Run Code Online (Sandbox Code Playgroud)

我也试图做其他事情,比如改变函数的输入

scala apache-spark

1
推荐指数
1
解决办法
1万
查看次数

Spark中的treeReduce与reduceByKey

我稍微看了下面的帖子:了解Spark中的TreeReduce

我仍然试图准确地理解何时使用treeReduce与reduceByKey.我想我们可以使用像字数一样的通用示例来帮助我进一步了解正在发生的事情.

  • 在字数统计中使用reduceByKey总是有意义的吗?
  • 或者,当treeReduce更有意义时,是否存在特定大小的数据?
  • 当treeReduce是更好的选择时,是否有特殊情况或拇指规则?
  • 这也可以在上面基于reduceByKey回答,但是使用reduceByKeyLocally和treeReduce做了任何改变
  • 我如何恰当地确定深度?

编辑:所以玩spark-shell,我认为我从根本上不了解treeReduce的概念,但希望一个例子和那些问题有帮助.

res2: Array[(String, Int)] = Array((D,1), (18964,1), (D,1), (1,1), ("",1), ("",1), ("",1), ("",1), ("",1), (1,1))

scala> val reduce = input.reduceByKey(_+_)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:25

scala> val tree = input.treeReduce(_+_, 2)
<console>:25: error: type mismatch;
 found   : (String, Int)
 required: String
       val tree = input.treeReduce(_+_, 2)
Run Code Online (Sandbox Code Playgroud)

apache-spark

1
推荐指数
1
解决办法
4791
查看次数