Spark,优化DF的度量生成

Mar*_*ele 10 optimization aggregate apache-spark

这是一个优化问题,这是我目前的(工作)情况:

  • Spark使用spark-jobserver以独立模式运行;
  • 我有一个镶木地板文件,其中约有3M行作为一张表缓存在内存中;
  • 该表是来自电子商务网站的所有数据的综合表,每行代表一个用户,但用户可以拥有更多行;

客户端请求是执行SQL查询,并将结果显示在某些表的网页上,每个表都代表一个带计数器的指标,如:

年龄=> 18-20:15位用户,21-35位:42位用户,......

国家=>美国:22个用户,GB:0个用户,......

等等.计算所有表(以及一些关于用户会话,基于活动,期间和年份生成的会话),我们目前有约200个指标.

生产中最后发布的系统使用(将df视为SQL查询产生的DataFrame):

df.rdd.aggregate(metricsMap) (

      (acc: MetricsMap, r:Row) => {
        acc.analyzeRow(r)
        acc
      },

      (acc1: MetricsMap, acc2: MetricsMap) => {
        acc1.merge(acc2)
        acc1
      }
    ) 
Run Code Online (Sandbox Code Playgroud)

其中MetricsMap是用于提取和从行聚合数据的对象.

此操作非常占用CPU,并且在服务器上从没有参数的查询中提取数据需要大约20秒(因此来自镶木地板文件中的所有数据).

我决定使用聚合,因为对于他们的统计分析,他们想要多个预期:一些指标应该由用户密钥计数,另一个指标用户名(对于网站......)和另一个按产品密钥计算.使用这种方法我只需要循环一次结果,但我不知道这是否是更好的方法......

这是更好的方法,还是存在一些其他(更快)的方法来获得相同的结果?

关于预先计算度量的问题,他们可以对数据集进行的查询没有约束,所以我不知道这是否可能......你能举个例子吗?

回答一些问题

Lio*_*aga 1

在这种情况下,如果您想要回答同一请求的多个查询,则数据的一个路径显然比多个循环更好。

尽管不与 Spark Core 互操作,但可能会更有效。

例如,如果您的 DF 架构如下:

root
 -- age
 -- country
Run Code Online (Sandbox Code Playgroud)

然后您可以尝试执行以下伪基本查询:

Select 
CASE WHEN (age BETWEEN 18 AND 22) THEN '18-22' 
     WHEN (age BETWEEN 22 AND 30) THEN '22-30' 
     ELSE 'Other' as age_group,
country
from metrics_df
Run Code Online (Sandbox Code Playgroud)

您还可以考虑对年龄组使用 UDF。正如 @assaf-mendelson 提到的,更多信息在这里会很有用。