小编blu*_*sky的帖子

在Scala Spark中找不到reduceByKey方法

试图从源代码运行http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala.

这一行:

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

投掷错误

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(String, Int)]
  val wordCounts = logData.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

logData.flatMap(line => line.split(" ")).map(word => (word, 1))返回MappedRDD,但我在http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD中找不到此类型

我从Spark源代码运行此代码,因此可能是类路径问题?但是必需的依赖项在我的类路径上.

scala apache-spark rdd

24
推荐指数
1
解决办法
1万
查看次数

在Apache Spark(Scala)中使用reduceByKey

我有一个类型的元组列表:(用户ID,名称,计数).

例如,

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)
Run Code Online (Sandbox Code Playgroud)

我正在尝试将此集合减少为计算每个元素名称的类型.

所以在上面val x被转换为:

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))
Run Code Online (Sandbox Code Playgroud)

这是我目前使用的代码:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用reduceByKey,因为它比groupByKey执行得更快.

如何实现reduceByKey而不是上面的代码来提供相同的映射?

scala apache-spark

23
推荐指数
2
解决办法
6万
查看次数

什么是在Apache Spark中的shuffle read和shuffle write

在端口8080上运行的Spark管理员的下方屏幕截图中:

在此输入图像描述

对于此代码,"Shuffle Read"和"Shuffle Write"参数始终为空:

import org.apache.spark.SparkContext;

object first {
  println("Welcome to the Scala worksheet")

  val conf = new org.apache.spark.SparkConf()
    .setMaster("local")
    .setAppName("distances")
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
    .set("spark.executor.memory", "2g")
  val sc = new SparkContext(conf)

  def euclDistance(userA: User, userB: User) = {

    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    println("value is" + sqRoot)
    ((userA.name, userB.name), sqRoot)
  }

  case class User(name: String, features: Vector[Double])

  def createUser(data: String) = …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

23
推荐指数
2
解决办法
2万
查看次数

如何使用Spark/Scala压缩集合?

在Scala中,我可以使用以下方法展平集合:

val array = Array(List("1,2,3").iterator,List("1,4,5").iterator)
                                                  //> array  : Array[Iterator[String]] = Array(non-empty iterator, non-empty itera
                                                  //| tor)


    array.toList.flatten                      //> res0: List[String] = List(1,2,3, 1,4,5)
Run Code Online (Sandbox Code Playgroud)

但是我如何在Spark中执行类似的操作?

阅读API文档http://spark.apache.org/docs/0.7.3/api/core/index.html#spark.RDD似乎没有提供此功能的方法?

scala apache-spark

21
推荐指数
2
解决办法
3万
查看次数

如何将List <String>列表转换为csv字符串

我有一个List字符串.是否有Java便捷方法将其转换ListCSV String?所以"test1,test2,test3"是List 3 String元素的转换结果,其中包含"test1""test2""test3"

我可以自己编写方法来转换,String但也许这已经由API实现了?

java csv string

20
推荐指数
6
解决办法
4万
查看次数

在敲门js中对<! - ko if:$ parent.name == name - >感到困惑

我遇到过这个Knockout代码,我对以下代码的执行情况感到困惑:

   <!-- ko if: $parent.name == name -->
   <a data-bind='text: name'></a> 
   <!-- /ko -->
Run Code Online (Sandbox Code Playgroud)

这段代码不应该被解释为评论吗?

阅读文档:
http://knockoutjs.com/documentation/custom-bindings-for-virtual-elements.html
这看起来像一个自定义绑定?

javascript knockout.js

18
推荐指数
2
解决办法
3万
查看次数

使用"saveAsTextFile"时Spark生成的文件是什么?

当我运行Spark作业并使用https://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache中指定的方法"saveAsTextFile"将输出保存为文本文件时. spark.rdd.RDD:

在此输入图像描述

这是创建的文件:

在此输入图像描述

.crc文件是循环冗余校验文件吗?用于检查每个生成的文件的内容是否正确?

_SUCCESS文件始终为空,这表示什么?

上面屏幕截图中没有扩展名的文件包含来自RDD的实际数据,但为什么生成了多个文件而不是一个?

apache-spark

18
推荐指数
1
解决办法
9421
查看次数

`pyspark mllib`与`pyspark ml`包

pyspark mllibpyspark ml包有什么区别?:

https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html

pyspark mllib 似乎是数据帧级别的目标算法 pyspark ml

我找到的一个区别是pyspark ml实现pyspark.ml.tuning.CrossValidatorpyspark mllib不是.

我的理解是,如果在Apache Spark框架上实现算法,库应该使用,mllib但似乎存在分裂?

在没有转换类型的情况下,每个框架之间似乎没有互操作性,因为它们各自包含不同的包结构.

python python-3.x apache-spark pyspark

18
推荐指数
1
解决办法
3892
查看次数

如何测试ajax错误回调?

在ajax请求中如何测试错误回调?是否可以模拟网络连接错误?

    $.ajax({
        url: "myUrl",       
        type: 'post',
        dataType : "json",
        data : ({
            myJson
        }),
        success : function(jsonSaveResponse) {  

        },
        error: function (xhr) {

    } 

    }); 
Run Code Online (Sandbox Code Playgroud)

javascript jquery selenium qunit

17
推荐指数
1
解决办法
1万
查看次数

JavaConverters asScala方法的时间复杂度

从Scala版本2.9开始,存在一个方便的转换器,可以java.util.List通过写下这样的内容将其他集合转换为Scala的数据结构:

import scala.collection.JavaConverters._
def scalaVersion = callJavaMethod.asScala
Run Code Online (Sandbox Code Playgroud)

这是一个可爱的小功能,因为它允许在与现有Java代码交互时利用Scala的优势.

但是,我不确定所涉及的时间和空间复杂性,并且在官方文档中找不到任何内容,因此,以下问题:

我在哪里可以获得有关JavaConverters的复杂性(时间和空间)的信息?

java scala scala-java-interop

15
推荐指数
1
解决办法
3041
查看次数