如何优化spark sql并行运行它

use*_*und 6 sql parallel-processing hadoop-yarn apache-spark apache-spark-sql

我是一个火花新手,并使用Spark SQL/hiveContext有一个简单的spark应用程序:

  1. 从蜂巢表中选择数据(10亿行)
  2. 做一些过滤,聚合包括row_number over window function来选择第一行,group by,count()和max()等.
  3. 将结果写入HBase(数亿行)

我提交作业在纱线集群(100个执行器)上运行它,它很慢,当我在Spark UI中查看DAG可视化时,似乎只有hive表扫描任务并行运行,其余步骤#2和#以上3只在一个实例中运行,可能应该能够优化并行化?

该应用程序看起来像:

步骤1:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)
Run Code Online (Sandbox Code Playgroud)

第2步:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)
Run Code Online (Sandbox Code Playgroud)

第3步:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
Run Code Online (Sandbox Code Playgroud)

DAG Visualization看起来像: 在此输入图像描述

正如您所看到的,阶段0中的"过滤器","项目"和"交换"仅在一个实例中运行,stage1和stage2也是如此,因此如果问题是愚蠢的话,那么一些问题和道歉:

  1. 在每个执行程序进行数据混洗后,"过滤器","项目"和"交换"是否发生在驱动程序中?
  2. 什么代码映射到"过滤器","项目"和"交换"?
  3. 我如何并行运行"过滤","项目"和"交换"以优化性能?
  4. 可以并行运行stage1和stage2吗?

Tza*_*har 5

您没有正确读取DAG图表 - 使用单个框可视化每个步骤这一事实并不意味着它不使用多个任务(因此核心)来计算该步骤.

您可以通过向下钻取到阶段视图来查看每个步骤使用的任务数,该阶段显示此阶段的所有任务.

例如,这是一个与您类似的示例DAG可视化:

在此输入图像描述

您可以看到每个阶段都由"单个"步骤列描述.

但是,如果我们查看下表,我们可以看到每个阶段的任务数量:

在此输入图像描述

其中一个仅使用2个任务,但另一个使用220,这意味着在给定足够的可用资源的情况下,数据被分成220个分区并且并行处理分区.

如果深入到该阶段,您可以再次看到它使用了220个任务和所有任务的详细信息.

在此输入图像描述

只有从磁盘读取数据的任务才会在图表中显示为具有这些"多个点",以帮助您了解读取的文件数量.

所以 - 正如拉希德的回答所暗示的那样,检查每个阶段的任务数量.