相关疑难解决方法(0)

如何使用Spark计算累积和

我有一个(String,Int)的rdd,按键排序

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
Run Code Online (Sandbox Code Playgroud)

现在我想要启动第一个键的值为零,后续键为前一个键的总和.

例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值)预期输出:

(c1,0), (c2,6), (c3,9)...
Run Code Online (Sandbox Code Playgroud)

是否有可能实现这一目标?我用地图尝试了它,但总和没有保留在地图内.

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

14
推荐指数
1
解决办法
9829
查看次数

如何使用RDD API在分区内排序(并避免跨分区排序)?

它是Hadoop MapReduce shuffle的默认行为,用于对分区内的shuffle键进行排序,但不是跨越分区(这是使得键跨越分区进行排序的总排序)

我会问如何使用Spark RDD实现相同的功能(在分区内排序,但不能跨分区排序)

  1. RDD的sortByKey方法是进行总排序
  2. RDD repartitionAndSortWithinPartitions正在分区内进行排序,但不是跨越分区,但不幸的是,它增加了一个额外的步骤来进行重新分区.

是否有直接的方法在分区内排序但不跨越分区?

apache-spark

11
推荐指数
1
解决办法
5832
查看次数

SparkSQL:使用两列的条件求和

我希望你能帮助我.我有一个DF如下:

val df = sc.parallelize(Seq(
  (1, "a", "2014-12-01", "2015-01-01", 100), 
  (2, "a", "2014-12-01", "2015-01-02", 150),
  (3, "a", "2014-12-01", "2015-01-03", 120), 
  (4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))
Run Code Online (Sandbox Code Playgroud)

我很乐意做一个groupBy prodId并汇总'value',将日期范围总结为'dateIns'和'dateTrans'列之间的差异.特别是,我想有一种方法来定义一个条件和,它总结了上述列之间预定义的最大差异内的所有值.即从dateIns('dateTrans' - 'dateIns'<= 10,20,30)10天,20天,30天之间发生的所有值.

在spark中是否有任何预定义的聚合函数允许进行条件求和?你建议开发一个aggr.UDF(如果是这样,任何建议)?我正在使用pySpqrk,但也很高兴获得Scala解决方案.非常感谢!

sql aggregate-functions apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
7534
查看次数

如何在Spark Scala中使用mapPartitions?

我有DocsRDD:RDD [String,String]

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
Run Code Online (Sandbox Code Playgroud)

DocsRDD:

Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n  .....\n bla bla bla bla \n ... bla
Run Code Online (Sandbox Code Playgroud)

有没有一种高效,优雅的方法从mapPartitions中提取n-gram?到目前为止,我已经尝试了所有的东西,我已经阅读了至少5次关于mapPartitions的一切,但我仍然无法理解如何使用它!似乎太难以操纵了.总之我想:

val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
Run Code Online (Sandbox Code Playgroud)

但是有效地使用mapPartitions.我对mapPartitions的基本误解是:

OneDocRDD:RDD [String]

 val OneDocRDD = sc.textFile("myDoc1.txt" , …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

3
推荐指数
1
解决办法
9963
查看次数