Spark:groupBy需要花费大量时间

use*_*849 1 reduce aggregate apache-spark

在我的应用程序中,当获取性能数字时,groupby正在吃掉很多时间.

我的RDD低于strcuture:

JavaPairRDD<CustomTuple, Map<String, Double>>
Run Code Online (Sandbox Code Playgroud)

CustomTuple: 此对象包含有关RDD中当前行的信息,如周,月,城市等.

public class CustomTuple implements Serializable{

private Map hierarchyMap = null;
private Map granularMap  = null;
private String timePeriod = null;
private String sourceKey  = null;
}
Run Code Online (Sandbox Code Playgroud)

地图

此地图包含有关该行的统计数据,例如投资额,GRP数量等.

<"Inv", 20>

<"GRP", 30>
Run Code Online (Sandbox Code Playgroud)

我正在这个RDD上执行DAG

  1. 对此RDD应用过滤器并限定相关行:过滤器
  2. 对此RDD应用过滤器并限定相关行:过滤器
  3. 加入RDD:加入
  4. 应用地图阶段计算投资:地图
  5. 应用GroupBy阶段根据所需视图对数据进行分组:GroupBy
  6. 应用地图阶段按照上一步骤中实现的分组聚合数据(比如跨时间段的视图数据),并根据希望收集的结果集创建新对象:Map
  7. 收集结果:收集

因此,如果用户想要跨时间段查看投资,则返回List以下(这是在上面的步骤4中实现的):

<timeperiod1, value> 
Run Code Online (Sandbox Code Playgroud)

当我检查操作时间时,GroupBy占用了执行整个DAG所用时间的90%.

IMO,我们可以通过sing reduce替换GroupBy和后续的Map操作.但是reduce会对JavaPairRDD类型的对象起作用.所以我的reduce会像T reduce(T,T,T),其中T将是CustomTuple,Map.

或者也许在上面的DAG中的第3步之后,我运行另一个map函数,该函数返回一个需要聚合的度量的RDD类型,然后运行reduce.

此外,我不确定聚合函数如何工作,并且它能够在这种情况下帮助我.

其次,我的应用程序将收到不同密钥的请求.在我当前的RDD设计中,每个请求都要求我在此密钥上重新分区或重新分组我的RDD.这意味着对于每个请求,分组/重新分区将占用我95%的时间来计算作业.

<"market1", 20>
<"market2", 30>
Run Code Online (Sandbox Code Playgroud)

这是非常令人沮丧的,因为没有Spark的应用程序的当前性能比Spark的性能好10倍.

任何见解都表示赞赏.

[编辑]我们也注意到JOIN花了很多时间.也许这就是为什么groupby需要时间.[编辑]

TIA!

小智 6

Spark的文档鼓励您避免操作groupBy操作,而是建议使用combineByKey或其衍生操作(reduceByKey或aggregateByKey).您必须使用此操作才能在shuffle之前和之后进行聚合(如果我们使用Hadoop术语,则在Map和Reduce的阶段),这样您的执行时间就会提高(如果它将是10次,我不会知道)更好,但必须更好)

如果我理解您的处理我认为您可以使用单个combineByKey操作以下代码的解释是针对scala代码,但您可以转换为Java代码而无需太多努力.

combineByKey有三个参数:combineByKey [C](createCombiner:(V)⇒C,mergeValue:(C,V)⇒C,mergeCombiners:(C,C)⇒C):RDD [(K,C)]

  • createCombiner:在此操作中,您将创建一个新类,以便合并您的数据,以便将CustomTuple数据聚合到一个新的Class CustomTupleCombiner中(我不知道您是否只想要总和,或者您可能想要将某些流程应用于此数据但可以在此操作中进行任一选项)

  • mergeValue:在这个操作中,你必须描述一个CustomTuple如何与另一个CustumTupleCombiner相加(我再次假设一个简单的汇总操作).例如,如果你想按键对数据求和,你将在你的CustumTupleCombiner类中有一个Map,所以操作应该是这样的:CustumTupleCombiner.sum(CustomTuple),它使CustumTupleCombiner.Map(CustomTuple.key) - > CustomTuple.Map( CustomTuple.key)+ CustumTupleCombiner.value

  • mergeCombiners:在这个操作中你必须定义如何在我的例子中合并两个Combiner类CustumTupleCombiner.因此,这将是这样CustumTupleCombiner1.merge(CustumTupleCombiner2),将是这样的CustumTupleCombiner1.Map.keys.foreach(K - > CustumTupleCombiner1.Map(K)+ CustumTupleCombiner2.Map(K))或类似的东西

pated代码没有被证明(这甚至不会编译因为我使用vim编写)但我认为这可能适用于你的场景.

我希望这会有用