小编use*_*849的帖子

如何更新RDD?

我们正在开发Spark框架,其中我们将历史数据移动到RDD集合中.

基本上,RDD是我们进行操作的不可变的只读数据集.基于此,我们已将历史数据移至RDD,并在此类RDD上进行过滤/映射等计算.

现在有一个用例,RDD中的数据子集得到更新,我们必须重新计算这些值.

HistoricalData采用RDD的形式.我根据请求范围创建另一个RDD,并在ScopeCollection中保存该RDD的引用

到目前为止,我已经能够想到以下方法 -

方法1:广播变化:

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在工作中,在该RDD上应用地图阶段 -

    2.a. 对于RDD中的每个节点,在广播上进行查找并创建一个现在更新的新值,从而创建一个新的RDD
    2.b. 现在我在step2.a上再次对这个新的RDD进行所有计算.像乘法,减少等
    2.c. 我将此RDDs引用保存在我的ScopeCollection中

方法2:为更新创建RDD

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在每个RDD上,使用具有更改的新RDD进行连接
  3. 现在我在步骤2中再次对这个新的RDD进行所有计算,如乘法,减少等

方法3:

我曾想过创建流RDD,我不断更新相同的RDD并进行重新计算.但据我所知,它可以从Flume或Kafka获取流.而在我的情况下,值是基于用户交互在应用程序本身中生成的.因此,我无法在上下文中看到流RDD的任何集成点.

关于哪种方法更好或任何其他适合此方案的方法的任何建议.

TIA!

apache-spark spark-streaming rdd

19
推荐指数
1
解决办法
5260
查看次数

Spark:groupBy需要花费大量时间

在我的应用程序中,当获取性能数字时,groupby正在吃掉很多时间.

我的RDD低于strcuture:

JavaPairRDD<CustomTuple, Map<String, Double>>
Run Code Online (Sandbox Code Playgroud)

CustomTuple: 此对象包含有关RDD中当前行的信息,如周,月,城市等.

public class CustomTuple implements Serializable{

private Map hierarchyMap = null;
private Map granularMap  = null;
private String timePeriod = null;
private String sourceKey  = null;
}
Run Code Online (Sandbox Code Playgroud)

地图

此地图包含有关该行的统计数据,例如投资额,GRP数量等.

<"Inv", 20>

<"GRP", 30>
Run Code Online (Sandbox Code Playgroud)

我正在这个RDD上执行DAG

  1. 对此RDD应用过滤器并限定相关行:过滤器
  2. 对此RDD应用过滤器并限定相关行:过滤器
  3. 加入RDD:加入
  4. 应用地图阶段计算投资:地图
  5. 应用GroupBy阶段根据所需视图对数据进行分组:GroupBy
  6. 应用地图阶段按照上一步骤中实现的分组聚合数据(比如跨时间段的视图数据),并根据希望收集的结果集创建新对象:Map
  7. 收集结果:收集

因此,如果用户想要跨时间段查看投资,则返回List以下(这是在上面的步骤4中实现的):

<timeperiod1, value> 
Run Code Online (Sandbox Code Playgroud)

当我检查操作时间时,GroupBy占用了执行整个DAG所用时间的90%.

IMO,我们可以通过sing reduce替换GroupBy和后续的Map操作.但是reduce会对JavaPairRDD类型的对象起作用.所以我的reduce会像T reduce(T,T,T),其中T将是CustomTuple,Map.

或者也许在上面的DAG中的第3步之后,我运行另一个map函数,该函数返回一个需要聚合的度量的RDD类型,然后运行reduce.

此外,我不确定聚合函数如何工作,并且它能够在这种情况下帮助我.

其次,我的应用程序将收到不同密钥的请求.在我当前的RDD设计中,每个请求都要求我在此密钥上重新分区或重新分组我的RDD.这意味着对于每个请求,分组/重新分区将占用我95%的时间来计算作业.

<"market1", 20>
<"market2", 30>
Run Code Online (Sandbox Code Playgroud)

这是非常令人沮丧的,因为没有Spark的应用程序的当前性能比Spark的性能好10倍.

任何见解都表示赞赏.

[编辑]我们也注意到JOIN花了很多时间.也许这就是为什么groupby需要时间.[编辑]

TIA!

reduce aggregate apache-spark

1
推荐指数
1
解决办法
5214
查看次数

标签 统计

apache-spark ×2

aggregate ×1

rdd ×1

reduce ×1

spark-streaming ×1