Spark SQL性能

sim*_*yun 7 java hbase apache-spark rdd apache-spark-sql

我的代码算法如下
Step1.获取一个hbase实体数据到hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class); 
Run Code Online (Sandbox Code Playgroud)

第2步.将hBaseRDD转换为rowPairRDD

     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();
Run Code Online (Sandbox Code Playgroud)

Step3.将rowPairRDD转换为schemaRDD

            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");
Run Code Online (Sandbox Code Playgroud)

Step4.使用spark sql做第一个简单的sql查询.

   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 
Run Code Online (Sandbox Code Playgroud)

Step5.使用spark sql做第二个简单的sql查询.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 
Run Code Online (Sandbox Code Playgroud)

第六步.用spark sql做第三个简单的sql查询.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 
Run Code Online (Sandbox Code Playgroud)

测试结果如下:

测试案例1:

当我插入300,000条记录时,hbase实体,然后运行代码.

  • 第一个查询需要60407毫秒
  • 第二个查询需要838毫秒
  • 3td查询需要792毫秒

如果我使用hbase Api进行类似的查询,它只需要2000毫秒.显然,最后2个spark sql查询比hbase api查询快得多.
我相信第一个spark sql查询花了很多时间从hbase加载数据.
因此,第一个查询比最后两个查询慢得多.我认为结果是预期的

测试案例2:

当我插入400,000条记录时.hbase实体,然后运行代码.

  • 第一个查询需要87213毫秒
  • 第二个查询需要83238毫秒
  • 3td查询需要82092毫秒

如果我使用hbase Api进行类似的查询,它只需要3500毫秒.显然,3个spark sql查询比hbase api查询要慢得多.
而最后2个spark sql querys也很慢而且性能类似于第一个查询,为什么呢?我该如何调整性能?

Mik*_*ark 3

我怀疑您尝试缓存的数据多于分配给 Spark 实例的数据。我将尝试分解完全相同的查询的每次执行中发生的情况。

首先,Spark 中的一切都是惰性的。这意味着当您调用 时rdd.cache(),在您对 RDD 执行某些操作之前实际上不会发生任何事情。

第一个查询

  1. 完整 HBase 扫描(慢)
  2. 增加分区数量(导致随机,缓慢)
  3. 数据实际上缓存到内存中,因为 Spark 很懒(有点慢)
  4. 应用 where 谓词(快速)
  5. 结果已收集

第二次/第三次查询

  1. 完整内存扫描(快速)
  2. 应用 where 谓词(快速)
  3. 结果已收集

现在,Spark 将尝试缓存尽可能多的 RDD。如果它无法缓存整个内容,您可能会遇到一些严重的速度减慢的情况。如果缓存之前的步骤之一导致随机播放,则尤其如此。您可能会在第一个查询中为每个后续查询重复步骤 1 - 3。这并不理想。

要查看您是否未完全缓存 RDD,请转到 Spark Web UI(http://localhost:4040如果处于本地独立模式)并查找 RDD 存储/持久性信息。确保其处于 100%。

编辑(根据评论):

40万条数据大小在我的hbase中只有250MB左右。为什么我需要使用2G来解决这个问题(但是1G>>250MB)

我不能肯定地说为什么你达到了最大限制spark.executor.memory=1G,但我会添加一些有关缓存的更多相关信息。

  • Spark 仅分配执行器堆内存的一定百分比用于缓存。默认情况下,该值为spark.storage.memoryFraction=0.660%。所以你真的只是得到了1GB * 0.6
  • HBase 中使用的总空间可能与 Spark 中缓存时占用的总堆空间不同。默认情况下,Spark 在内存中存储 Java 对象时不会序列化。因此,存储 JavaObject元数据会产生相当大的开销。您可以更改默认的持久性级别

您知道如何缓存所有数据以避免第一次查询性能不佳吗?

调用任何操作都会导致 RDD 被缓存。只要这样做

scala> rdd.cache
scala> rdd.count
Run Code Online (Sandbox Code Playgroud)

现在它已被缓存。