Mic*_*l N 1 hive apache-spark apache-spark-sql
我有两个大型Hive表,我想将它们与spark.sql连接。假设我们有表1和表2,表1中有500万行,表2中有7000万行。表是活泼的格式,并作为拼花文件存储在Hive中。
我想加入它们,并对某些列进行一些汇总,可以说计算所有行和一列的平均值(例如doubleColumn),同时使用两个条件进行过滤(在col1,col2上说)。
注意:我在一台机器上进行测试安装(虽然功能很强大)。我希望集群中的性能可能会有所不同。
我的第一次尝试是使用spark sql像这样:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
Run Code Online (Sandbox Code Playgroud)
不幸的是,即使我为每个执行程序和驱动程序提供至少8 GB的内存,运行时间也只有大约5分钟,非常差。我还尝试使用数据帧语法,并尝试首先过滤行并仅选择特定的列以具有更好的选择性,例如:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
Run Code Online (Sandbox Code Playgroud)
但这并没有明显的性能提升。如何提高联接的性能?
哪种是执行spark.sql或dataframe语法的最佳方法?
给更多的执行者或记忆会有所帮助吗?
我应该使用缓存吗?
我同时缓存了两个数据框tab1,tab2和联接聚合,但是我认为缓存我的数据框不切实际,因为我们对并发性很感兴趣,许多用户同时询问一些分析查询。
有什么可做的,因为我在单节点上工作,而在群集上进入生产环境时,问题会消失了吗?
奖励问题:我使用Impala尝试了该查询,它执行了大约40秒,但比spark.sql更好。Impala如何比Spark更好?
小智 5
哪种是执行spark.sql或dataframe语法的最佳方法?
没有任何区别。
给更多的执行者或记忆会有所帮助吗?
仅当问题不是由数据偏斜引起的并且您正确调整配置时。
我应该使用缓存吗?
如果输入数据被多次重用,那么从性能角度考虑(在您已经确定的情况下)可能是明智的。
有什么可做的,因为我在单节点上工作,而在群集上进入生产环境时,问题会消失了吗?
通常,在单个节点上进行性能测试是完全没有用的。它错过了瓶颈(网络IO /通信)和优势(摊销的磁盘I / O和资源使用)。
然而,你可以显著降低parallelsm( spark.sql.shuffle.partitions,sql.default.parallelism并增加了输入分配的大小)。专为分配负载而设计的Counter-inteutiv Spark样式并行性,对单台机器而言,不是资产,而是资产。与共享内存相比,它依赖于随机播放(磁盘写入!)进行通信,从而使事情变得极其缓慢,并且调度开销非常大。
Impala如何比Spark更好?
因为它是专为低延迟并发查询而设计的。这不是Spark的目标(数据库与ETL框架)。
当你
因为我们对并发感兴趣,所以许多用户同时询问一些分析性查询。
Spark听起来似乎不是正确的选择。
| 归档时间: |
|
| 查看次数: |
2537 次 |
| 最近记录: |