我有一个(String,Int)的rdd,按键排序
val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
Run Code Online (Sandbox Code Playgroud)
现在我想要启动第一个键的值为零,后续键为前一个键的总和.
例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值)预期输出:
(c1,0), (c2,6), (c3,9)...
Run Code Online (Sandbox Code Playgroud)
是否有可能实现这一目标?我用地图尝试了它,但总和没有保留在地图内.
var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Run Code Online (Sandbox Code Playgroud) 它是Hadoop MapReduce shuffle的默认行为,用于对分区内的shuffle键进行排序,但不是跨越分区(这是使得键跨越分区进行排序的总排序)
我会问如何使用Spark RDD实现相同的功能(在分区内排序,但不能跨分区排序)
sortByKey方法是进行总排序repartitionAndSortWithinPartitions正在分区内进行排序,但不是跨越分区,但不幸的是,它增加了一个额外的步骤来进行重新分区.是否有直接的方法在分区内排序但不跨越分区?
我希望你能帮助我.我有一个DF如下:
val df = sc.parallelize(Seq(
(1, "a", "2014-12-01", "2015-01-01", 100),
(2, "a", "2014-12-01", "2015-01-02", 150),
(3, "a", "2014-12-01", "2015-01-03", 120),
(4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))
Run Code Online (Sandbox Code Playgroud)
我很乐意做一个groupBy prodId并汇总'value',将日期范围总结为'dateIns'和'dateTrans'列之间的差异.特别是,我想有一种方法来定义一个条件和,它总结了上述列之间预定义的最大差异内的所有值.即从dateIns('dateTrans' - 'dateIns'<= 10,20,30)10天,20天,30天之间发生的所有值.
在spark中是否有任何预定义的聚合函数允许进行条件求和?你建议开发一个aggr.UDF(如果是这样,任何建议)?我正在使用pySpqrk,但也很高兴获得Scala解决方案.非常感谢!
sql aggregate-functions apache-spark apache-spark-sql pyspark
我有DocsRDD:RDD [String,String]
val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
Run Code Online (Sandbox Code Playgroud)
DocsRDD:
Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n .....\n bla bla bla bla \n ... bla
Run Code Online (Sandbox Code Playgroud)
有没有一种高效,优雅的方法从mapPartitions中提取n-gram?到目前为止,我已经尝试了所有的东西,我已经阅读了至少5次关于mapPartitions的一切,但我仍然无法理解如何使用它!似乎太难以操纵了.总之我想:
val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
Run Code Online (Sandbox Code Playgroud)
但是有效地使用mapPartitions.我对mapPartitions的基本误解是:
OneDocRDD:RDD [String]
val OneDocRDD = sc.textFile("myDoc1.txt" , …Run Code Online (Sandbox Code Playgroud)