在Spark(一个RedShift)中是否有用于高效连接的数据架构?

Dav*_*rio 5 amazon-redshift apache-spark apache-spark-sql spark-dataframe

我有数据,我想做很多分析查询,我想弄清楚是否有一个机制,我可以用来存储它,以便Spark可以有效地对它进行连接.我有一个使用RedShift的解决方案,但理想情况下更喜欢在S3中使用基于文件的内容,而不是全天候提供整个RedShift群集.

数据简介

这是一个简化的例子.我们有2个初始CSV文件.

  • 人员记录
  • 事件记录

这两个表通过person_id字段链接.person_id在Person表中是唯一的.事件与人有多对一的关系.

目标

我想了解如何设置数据,以便我可以有效地执行以下查询.我需要执行这样的许多查询(所有查询都是基于每个人进行评估):

查询是生成一个包含4列的数据框,每个人都有1行.

  • person_id - 数据集中每个人的person_id
  • 年龄 - 来自人员记录的"年龄"字段
  • 成本 - "日期"在2013年6月期间该人员的所有事件记录的"成本"字段的总和

我在解决这个问题时使用Spark的所有当前解决方案都涉及重新调整所有数据,最终导致大量(数亿人)的流程变慢.我很高兴有一个解决方案,要求我重新整理数据并将其写入不同的格式,如果这样可以加快以后的查询速度.

使用RedShift的解决方案

我可以使用RedShift以相当简单的方式完成此解决方案:

每个文件都作为RedShift表加载,使用DISTKEY person_id,SORTKEY person_id.这会分发数据,以便人员的所有数据都在一个节点上.以下查询将生成所需的数据框:

select person_id, age, e.cost from person 
    left join (select person_id, sum(cost) as cost from events 
       where date between '2013-06-01' and '2013-06-30' 
       group by person_id) as e using (person_id)
Run Code Online (Sandbox Code Playgroud)

使用Spark/Parquet的解决方案

我已经想到了几种在Spark中处理这个问题的潜在方法,但是没有一种能够实现我的需求.我的想法和问题如下:

  • Spark数据集写'bucketBy' - 读取CSV文件,然后使用"bucketBy"将它们重写为镶木地板文件.这些镶木地板文件的查询可能非常快.这将生成类似于RedShift的数据设置,但镶木地板文件不支持bucketBy.
  • Spark镶木地板隔断 - Parquet确实支持分区.因为为每个分区键创建一个单独的文件集,您必须创建一个要分区的计算列,并使用person_id的哈希来创建partitionKey.但是,当您稍后在基于"partition_key"和"person_id"的spark中加入这些表时,查询计划仍会执行完整的散列分区.因此,这种方法并不比只读取CSV并每次都进行改组更好.
  • 存储在除镶木地板之外的其他一些数据格式 - 我对此持开放态度,但不知道其他可用的数据源.
  • 使用复合记录格式 - Parquet支持分层数据格式,因此可以将两个表预加入分层记录(其中人员记录具有"事件"字段,这是一个结构元素数组),然后对其进行处理.当您有分层记录时,有两种方法可以处理它:
    • **使用explode创建单独的记录** - 使用此方法将阵列字段分解为完整行,然后使用标准数据框操作进行分析,然后将它们连接回主表.不幸的是,我一直无法使用这种方法来有效地编译查询.
    • **使用UDF对子记录执行操作** - 这样可以保留结构并执行而不需要随机播放,但这是一种笨拙且冗长的编程方式.此外,它需要大量的UDF,这些UDF对性能不是很好(尽管它们击败了大规模的数据混乱).

对于我的用例,Spark优于RedShift,这在这个简单的例子中并不明显,所以我更喜欢用Spark做这个.如果我遗漏了一些东西并且有一个很好的方法,请告诉我.

Gar*_*n S 2

根据评论进行编辑。

假设:

  • 使用镶木地板

这是我会尝试的:

val eventAgg = spark.sql("""select person_id, sum(cost) as cost 
                            from events 
                            where date between '2013-06-01' and '2013-06-30' 
                            group by person_id""")
eventAgg.cache.count
val personDF = spark.sql("""SELECT person_id, age from person""")
personDF.cache.count // cache is less important here, so feel free to omit
eventAgg.join(personDF, "person_id", "left")
Run Code Online (Sandbox Code Playgroud)

我只是用我的一些数据进行了此操作,结果如下(9 个节点/140 个 vCPU 集群,约 600GB RAM):

27,000,000,000 个“事件”(总计 14,331,487 个“人”)

64,000,000“人”(~20 列)

聚合事件构建和缓存大约需要 3 分钟

人们缓存花了大约 30 秒(从网络拉取,而不是镶木地板)

左加入花了几秒钟

不缓存“人员”导致连接时间延长了几秒钟。然后强制 Spark 广播数百 MB 聚合事件,使连接耗​​时不到 1 秒。