Dav*_*rio 5 amazon-redshift apache-spark apache-spark-sql spark-dataframe
我有数据,我想做很多分析查询,我想弄清楚是否有一个机制,我可以用来存储它,以便Spark可以有效地对它进行连接.我有一个使用RedShift的解决方案,但理想情况下更喜欢在S3中使用基于文件的内容,而不是全天候提供整个RedShift群集.
这是一个简化的例子.我们有2个初始CSV文件.
这两个表通过person_id字段链接.person_id在Person表中是唯一的.事件与人有多对一的关系.
我想了解如何设置数据,以便我可以有效地执行以下查询.我需要执行这样的许多查询(所有查询都是基于每个人进行评估):
查询是生成一个包含4列的数据框,每个人都有1行.
我在解决这个问题时使用Spark的所有当前解决方案都涉及重新调整所有数据,最终导致大量(数亿人)的流程变慢.我很高兴有一个解决方案,要求我重新整理数据并将其写入不同的格式,如果这样可以加快以后的查询速度.
我可以使用RedShift以相当简单的方式完成此解决方案:
每个文件都作为RedShift表加载,使用DISTKEY person_id,SORTKEY person_id.这会分发数据,以便人员的所有数据都在一个节点上.以下查询将生成所需的数据框:
select person_id, age, e.cost from person
left join (select person_id, sum(cost) as cost from events
where date between '2013-06-01' and '2013-06-30'
group by person_id) as e using (person_id)
Run Code Online (Sandbox Code Playgroud)
我已经想到了几种在Spark中处理这个问题的潜在方法,但是没有一种能够实现我的需求.我的想法和问题如下:
对于我的用例,Spark优于RedShift,这在这个简单的例子中并不明显,所以我更喜欢用Spark做这个.如果我遗漏了一些东西并且有一个很好的方法,请告诉我.
根据评论进行编辑。
假设:
这是我会尝试的:
val eventAgg = spark.sql("""select person_id, sum(cost) as cost
from events
where date between '2013-06-01' and '2013-06-30'
group by person_id""")
eventAgg.cache.count
val personDF = spark.sql("""SELECT person_id, age from person""")
personDF.cache.count // cache is less important here, so feel free to omit
eventAgg.join(personDF, "person_id", "left")
Run Code Online (Sandbox Code Playgroud)
我只是用我的一些数据进行了此操作,结果如下(9 个节点/140 个 vCPU 集群,约 600GB RAM):
27,000,000,000 个“事件”(总计 14,331,487 个“人”)
64,000,000“人”(~20 列)
聚合事件构建和缓存大约需要 3 分钟
人们缓存花了大约 30 秒(从网络拉取,而不是镶木地板)
左加入花了几秒钟
不缓存“人员”导致连接时间延长了几秒钟。然后强制 Spark 广播数百 MB 聚合事件,使连接耗时不到 1 秒。
| 归档时间: |
|
| 查看次数: |
510 次 |
| 最近记录: |