标签: rdd

RDD join : 加入两个不同的对RDD后,结果RDD键值和顺序是否发生了变化?

我有两对 RDD 可以说

 RDD1 : [(1,a),(2,b),(3,c)]    
 RDD2 : [(1,d),(2,e),(3,f)]
Run Code Online (Sandbox Code Playgroud)

现在我使用 join 加入这些 RDD

 RDD3 = RDD1.join(RDD2);
Run Code Online (Sandbox Code Playgroud)

我用下面的代码显示了 RDD3 中的元素

 for(Tuple2<Integer,Tuple2<String,String>> tuple : RDD3.collect()) 
                      System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());
Run Code Online (Sandbox Code Playgroud)

我见过奇怪的结果,比如

5:b,e
4:a,d 
6:c,f
Run Code Online (Sandbox Code Playgroud)

正如我预期的那样

1:a,d
1:b,e 
1:c,f
Run Code Online (Sandbox Code Playgroud)

有没有办法获得像上面那样的所需输出?或者我错误地解释了 RDD 行为?请建议

编辑 :

其实我正在阅读这样的数据

JavaDoubleRDD data1 = sc.parallelizeDoubles(Arrays.asList(45.25,22.15,33.24));
JavaDoubleRDD data2 = sc.parallelizeDoubles(Arrays.asList(23.45,19.35,12.45));
Run Code Online (Sandbox Code Playgroud)

进而

JavaPairRDD<Double,Double> lat1 = data1.cartesian(data1);
JavaRDD<Double> lat2 = lat1.map(new Function<Tuple2<Double,Double>,Double>() {
    @Override
    public Double call(Tuple2<Double,Double> t) {
        return Math.pow(t._1()-t._2(),2);
    }
});
 //flag and flag1 are static variables initially equal to 1
JavaPairRDD<Integer,Double> lat3 = …
Run Code Online (Sandbox Code Playgroud)

java join apache-spark rdd

0
推荐指数
1
解决办法
6953
查看次数

Spark隐式RDD转换不起作用

我对Spark的分隔数据排序似乎有类似的问题,但是接受的解决方案并不能解决我的问题.

我正在尝试在一个简单的RDD上应用combineByKey:

    package foo
    import org.apache.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext._

    object HelloTest {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Test")
        val sc = new SparkContext(sparkConf)
        val input = sc.textFile("/path/to/test.txt")
        val result = input.combineByKey(
          (v) => (v, 1), 
          (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
          (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
        ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
        result.collectAsMap().map(println(_))

           sc.stop()
         }
    }  
Run Code Online (Sandbox Code Playgroud)

编译时我得到(唯一的)跟随错误: …

scala apache-spark rdd

0
推荐指数
1
解决办法
1046
查看次数

Spark RDD groupByKey + join 与 join 性能对比

我正在与其他用户共享的集群上使用 Spark。所以仅仅根据运行时间来判断我的哪一个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,并使我的代码执行更长时间。

那么我可以在这里问两个问题吗:

  1. 我正在使用joinfunction 来 join 2RDDs并且我尝试groupByKey()在 using 之前使用join,如下所示:

    rdd1.groupByKey().join(rdd2)
    
    Run Code Online (Sandbox Code Playgroud)

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 让我的查询运行得更快。由于 Spark 使用惰性求值,我想知道groupByKeybefore是否join会让事情变得更快

  2. 我注意到Spark有一个SQL模块,到目前为止我真的没有时间尝试它,但是我可以问一下SQL模块和RDD SQL类似功能之间有什么区别吗?

apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
4321
查看次数

将列表的RDD转换为Dataframe

我正在尝试将RDD列表转换为DataframeSpark中的列表.

RDD:

['ABC', 'AA', 'SSS', 'color-0-value', 'AAAAA_VVVV0-value_1', '1', 'WARNING', 'No test data for negative population! Re-using negative population for non-backtest.']
['ABC', 'SS', 'AA', 'color-0-SS', 'GG0-value_1', '1', 'Temp', 'After, date differences are outside tolerance (10 days) 95.1% of the time']
Run Code Online (Sandbox Code Playgroud)

这是RDD多个列表的内容.

如何将其转换为数据帧?目前,它正在将其转换为单列,但我需要多列.

Dataframe
+--------------+
|            _1|
+--------------+
|['ABC', 'AA...|
|['ABC', 'SS...|
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark rdd

0
推荐指数
1
解决办法
1124
查看次数

RDD地图功能的工作方式不同

我有下面的代码,通常map函数是一个高阶函数,它在其参数中获取一个函数并使用该函数计算元素.但在这种情况下,map不是一个函数而是一个Map类型.无法理解地图功能如何工作?

Spark context available as sc (master = yarn-client, app id = application_1473775536920_2711).
SQL context available as sqlContext.

scala> val pws = Map("Apache Spark" -> "http://spark.apache.org/", "Scala" -> "http://www.scala-lang.org/")
pws: scala.collection.immutable.Map[String,String] = Map(Apache Spark -> http://spark.apache.org/, Scala -> http://www.scala-lang.org/)

scala> val websites = sc.parallelize(Seq("Apache Spark", "Scala")).map(pws).collect
16/09/23 02:50:15 WARN util.ClosureCleaner: Expected a closure; got scala.collection.immutable.Map$Map2
[Stage 0:>                                                          (0 + 0) / 2]16/09/23 02:50:31 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark rdd

0
推荐指数
1
解决办法
239
查看次数

在 Spark Java API 中将 JavaPairRDD 转换为 Dataframe

我在 Java 7 中使用 Spark 1.6

我有一对RDD:

JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);
Run Code Online (Sandbox Code Playgroud)

我想将其转换DataFrame为模式。

看来首先我必须将pairRDD转换为RowRDD。

那么如何从 PairRDD 创建 RowRdd 呢?

java apache-spark rdd spark-dataframe java-pair-rdd

0
推荐指数
1
解决办法
6200
查看次数

如何同时使用两个功能对RDD条目进行排序?

我有一个 Spark RDD,我想以有组织的方式对其条目进行排序。假设条目是一个包含 3 个元素的元组(name,phonenumber,timestamp)。我想首先根据 的值对条目进行排序phonenumber,然后根据 的值进行timestamp排序,同时尊重而不是更改基于phonenumber. (所以timestamp只根据phonenumber排序重新排列)。是否有 Spark 函数来执行此操作?

(我在 Scala 中使用 Spark 2.x)

scala apache-spark rdd apache-spark-2.0

0
推荐指数
1
解决办法
572
查看次数

由于 False 作为条目,pyspark 中 json 文件的记录已损坏

我有一个 json 文件,如下所示:

test= {'kpiData': [{'date': '2020-06-03 10:05',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:10',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:15',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:20',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,}
]}
Run Code Online (Sandbox Code Playgroud)

我想将其传输到数据框对象,如下所示:

rdd = sc.parallelize([test])
jsonDF = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)

这会导致记录损坏。据我了解,其原因是,TrueFalse不能是 Python 中的条目。所以我需要在之前将这些条目转换spark.read.json()为 TRUE、true 或“True”)。test 是一个字典,rdd 是一个 pyspark.rdd.RDD 对象。对于数据帧对象,转换非常简单,但我没有找到这些对象的解决方案。

json apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
2163
查看次数

将RDD [Array [Int]]求和到RDD [Int]

我有一个RDD的Array [Int],我想要RDD [Int]形式的每个数组中所有元素的总和.实现这一目标的最佳方法是什么?

arrays scala apache-spark rdd

-5
推荐指数
1
解决办法
393
查看次数