sim*_*yun 7 java hbase apache-spark rdd apache-spark-sql
我的代码算法如下
Step1.获取一个hbase实体数据到hBaseRDD
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Run Code Online (Sandbox Code Playgroud)
第2步.将hBaseRDD转换为rowPairRDD
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
Run Code Online (Sandbox Code Playgroud)
Step3.将rowPairRDD转换为schemaRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
Run Code Online (Sandbox Code Playgroud)
Step4.使用spark sql做第一个简单的sql查询.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Run Code Online (Sandbox Code Playgroud)
Step5.使用spark sql做第二个简单的sql查询.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Run Code Online (Sandbox Code Playgroud)
第六步.用spark sql做第三个简单的sql查询.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Run Code Online (Sandbox Code Playgroud)
测试结果如下:
测试案例1:
当我插入300,000条记录时,hbase实体,然后运行代码.
如果我使用hbase Api进行类似的查询,它只需要2000毫秒.显然,最后2个spark sql查询比hbase api查询快得多.
我相信第一个spark sql查询花了很多时间从hbase加载数据.
因此,第一个查询比最后两个查询慢得多.我认为结果是预期的
测试案例2:
当我插入400,000条记录时.hbase实体,然后运行代码.
如果我使用hbase Api进行类似的查询,它只需要3500毫秒.显然,3个spark sql查询比hbase api查询要慢得多.
而最后2个spark sql querys也很慢而且性能类似于第一个查询,为什么呢?我该如何调整性能?
我怀疑您尝试缓存的数据多于分配给 Spark 实例的数据。我将尝试分解完全相同的查询的每次执行中发生的情况。
首先,Spark 中的一切都是惰性的。这意味着当您调用 时rdd.cache(),在您对 RDD 执行某些操作之前实际上不会发生任何事情。
第一个查询
第二次/第三次查询
现在,Spark 将尝试缓存尽可能多的 RDD。如果它无法缓存整个内容,您可能会遇到一些严重的速度减慢的情况。如果缓存之前的步骤之一导致随机播放,则尤其如此。您可能会在第一个查询中为每个后续查询重复步骤 1 - 3。这并不理想。
要查看您是否未完全缓存 RDD,请转到 Spark Web UI(http://localhost:4040如果处于本地独立模式)并查找 RDD 存储/持久性信息。确保其处于 100%。
编辑(根据评论):
40万条数据大小在我的hbase中只有250MB左右。为什么我需要使用2G来解决这个问题(但是1G>>250MB)
我不能肯定地说为什么你达到了最大限制spark.executor.memory=1G,但我会添加一些有关缓存的更多相关信息。
spark.storage.memoryFraction=0.660%。所以你真的只是得到了1GB * 0.6。Object元数据会产生相当大的开销。您可以更改默认的持久性级别。您知道如何缓存所有数据以避免第一次查询性能不佳吗?
调用任何操作都会导致 RDD 被缓存。只要这样做
scala> rdd.cache
scala> rdd.count
Run Code Online (Sandbox Code Playgroud)
现在它已被缓存。