如何在AWS中高效聚合数十亿条单独记录中的数据?

ste*_*esu 2 analytics amazon-web-services amazon-emr amazon-redshift amazon-athena

在高/理论水平上,我确切地知道我想要构建的架构类型以及它将如何工作,但我试图使用 AWS 服务尽可能便宜地构建它,而我对 AWS 产品的不熟悉让我围着圈跑。

数据

我们运营一个视频流平台。在繁忙的夜晚,我们会同时进行约 100 个直播,观看人数超过 30,000 人。我们预计这个数字在未来几年内将增至 100,000。直播平均持续 2 小时。

我们每 10 秒从播放器发送一次心跳,其中包含有关观看者的信息 - 他们观看了多少数据、他们缓冲了多少数据、他们的流媒体质量如何,等等。

这些心跳直接发送到 AWS Kinesis 终端节点。

最后,我们希望将所有过去的消息保留至少 5 年(希望更长),以便我们可以查看历史分析。

一些粗略计算表明,五年后我们将拥有 0.1 * 60 * 60 * 2 * 100000 * 365 * 5 = 1310亿条心跳消息。

我们的旧管道

我们的旧系统只有一个 Kinesis 消费者。聚合数据存储在 DynamoDB 中。每当消息到达时,我们都会从 DynamoDB 读取记录,更新记录,然后写回新记录。这种读取-更新-写入循环限制了我们处理消息的速度,并且使得每个传入的消息都依赖于它之前的消息,因此它们无法并行处理。

这种设置的部分原因是我们的消息模式从一开始就设计得不好。我们发送消息发送的时间戳,但我们不发送“自上次心跳以来观看的视频量”。因此,为了计算总观看时间,我们需要查找该播放器发送的最后一条心跳消息,减去时间戳,然后添加值。许多其他指标也存在类似的问题。

我们的新管道

我们已经开始遇到扩展问题。在高峰时段,在等待处理积压的消息时,分析可能会延迟多达四个小时。如果积压达到 24 小时,Kinesis 将开始删除数据。因此,我们需要修复管道以消除对过去消息的依赖,以便我们可以并行处理它们。

第一部分是更新我们的玩家发送的消息。我们的新规范仅包含可以简单求和而无需减法的指标。因此,例如,我们可以继续添加“查看时间”指标,而不考虑过去的消息。

第二部分是确保 Kinesis 永远不会备份。我们在原始消息到达后立即将其转储到 S3,无需进行任何处理(Kinesis Data Fire Hose),以便我们可以在闲暇时对它们进行分析。

最后,我们现在希望尽快从这些分析中实际提取信息。这就是我遇到了障碍的地方。

我们想要回答的问题

由于这是一个分析管道,我们的问题主要围绕过滤这些消息,然后聚合剩余消息的字段(可能,实际上很可能,通过分组)。例如:

有多少 Android 用户观看了昨晚的高清直播?(按流和操作系统过滤)

所有用户的平均带宽使用情况是多少?(SUM 和 COUNT,稍后可以在仪表板端完成最终聚合的划分)

去年使用 Apple 设备(iOS、tvOS 等)的用户比例是多少?(COUNT,按操作系统分组)

去年 Android 用户缓冲流媒体的平均时间是多少?(以上所有内容的混合)

选项

  • AWS Athena 允许我们直接查询 S3 中的数据,就像查询 ANSI SQL 表一样。然而,阅读 Athena 时,除非数据格式正确,否则速度可能会非常慢。我见过的一些基准测试表明,处理 11 亿行 CSV 数据最多可能需要 2 分钟。我正在考虑处理 100 倍的数据
  • AWS EMR 和 AWS Redshift 听起来像是为此目的而构建的,但设置起来很复杂,并且运行的基本成本很高(需要 EC2 集群始终保持活动状态)。AWS Redshift 还需要将数据加载到其中,这听起来可能是一个非常缓慢的过程,延迟了我们对分析的访问
  • AWS Glue 听起来似乎能够在原始消息到达 S3 时获取它们并将其转换为 Parquet 文件,以便通过 Athena 进行更快速的查询
  • 我们可以运行一个作业来定期批处理消息,以减少必须处理的总数。当直播时,我们每 10 秒就会收到一条消息,但我们实际上只关心给定观看者的总数。这意味着,当 2 小时的直播结束时,我们可以将从该玩家收到的 720 条消息合并为一条有关观众在整个直播期间体验的“摘要”消息。这将大大减少我们需要处理的数据量,但我不清楚如何以及何时触发此过程

理想的架构

这是一个大数据问题。大数据问题的通用解决方案是“不要将数据带入查询,而是将查询带入数据”。如果这些消息分布在 100 个小型存储节点上,那么每个节点都可以对它们所保存的数据子集进行过滤、求和和计数,并将这些聚合传递回中央节点,由中央节点对总和和计数进行求和。如果每个节点仅运行数据集的 1/100,那么理论上这种处理速度会非常快。

我的困惑

虽然我对“理想”架构有理论上的了解,但我不清楚 AWS 是否以这种方式工作,或者如何构建一个像这样运行良好的系统。

  • S3是一个黑匣子。目前尚不清楚 Athena 查询是否在各个节点上运行并且聚合在其他地方进一步减少,或者是否有一个系统读取所有数据并将其聚合在一个中心位置
  • Redshift 需要将数据复制到 Redshift 数据库中。这听起来不快,也不分布式
  • 我不清楚 EMR 是如何工作的,也不清楚它是否适合我的目的。仍在研究中
  • AWS Glue 似乎可能需要由某些事件触发?
  • Parquet 文件似乎类似于 CSV,其中多个记录驻留在一个文件中。与此同时,我正在转储每个文件一条记录。但也许有办法解决这个问题?例如每分钟或每 5 分钟批处理文件?
  • RDS 或类似的服务可能对此非常有用(索引等),但需要有保证的架构(或者如果我们的消息架构发生更改则需要迁移),这是一个问题。如果我们更改消息模式,则迁移数 TB 的数据听起来是不可能的

最后,除了希望尽可能“实时”获得分析结果(理想情况下,我们希望在 1 分钟内知道有人加入或离开流),我们还希望仪表板能够快速加载。等待 30 秒才能看到现场观众的数量是可怕的。仪表板应在 2 秒或更短的时间内加载(理想情况下)

计划是使用 QuickSight 创建仪表板(我们的旧系统有一个 hack-y Django 应用程序,可以从我们的 DynamoDB 聚合表中读取数据,但我想避免创建更多代码供人们维护)

Bil*_*ner 5

我希望您能从众多专家那里得到很多不同的答案和意见。由于存在很多变量,因此可能没有单一的最佳答案。让我根据我在该领域的经验为您提供最好的建议。

Kinesis 到 S3 是一个良好的开始,不移动超出需要的数据是正确的理念。

您没有提到 Kinesis Data Analytics,这可能是满足您某些需求的解决方案。它最适合回答有关数据源中当前发生的情况的问题。较长时间范围的问题更适合您提到的工具。如果您对过去 10 分钟(左右)发生的事情不太感兴趣,最好忽略。

S3 组织对于直接对其中的数据执行任何分析至关重要。您提到镶木地板格式化很好,但分区功能更强大。将 S3 数据组织成“天”或“小时”的数据并基于此设置分区可以大大加快任何在所需时间量有限的查询(不要读取不需要的内容) )。

关于 S3 的重要安全说明 - S3 是一个对象存储,因此您引用的每个对象都会产生开销。无论您采用哪种解决方案,将许多小对象(10,000+)视为一组数据都会很慢。在继续采取任何解决方案之前,您需要解决这个问题。您会发现在 S3 中查找对象需要 0.5 秒以上的时间,但如果文件很小,则传输时间几乎为零。现在将您拥有的所有对象乘以 0.5 秒,看看读取它们需要多长时间。这不是您选择的下游工具的功能,而是您拥有的 S3 组织的功能。作为大数据解决方案一部分的 S3 对象大小应至少为 100M,以免对象查找时间受到太大影响。如果不首先解决对象大小和分区问题,那么 parquet 或 CSV 文件的选择是无声的。

Athena 非常适合偶尔查询,尤其是在日期范围有限的情况下。这是您期望的查询模式吗?正如您所说的“将计算移至数据”,但如果您使用 Athena 进行需要使用大部分数据的大型横截面分析,则每次执行此查询时,您只需将数据移至 Athena。不要在存储数据时停止考虑数据移动 - 还要考虑数据移动以进行分析。

因此,一个大问题是需要多少数据以及多久才能支持您的分析工作负载和 BI 功能?这就是您正在寻找的最终结果。如果经常需要较高比例的数据,那么像 Redshift 这样将数据加载到磁盘的仓库解决方案是正确的答案。Redshift 的数据加载时间非常快,因为它并行加载来自 S3 的数据(您可以看到 S3 是一个集群,Redshift 是一个集群,可以并行加载)。如果您需要将所有数据加载到 Redshift 中,那么加载时间不是您主要关心的问题 - 成本才是。功能强大的工具,价格也相匹配。对于大数据规模的集群,新的 RA3 实例类型显着降低了这条曲线,因此这是一种可能性。

您没有提到的另一个工具是 Redshift Spectrum。这将多种对您来说可能很重要的强大技术结合在一起。首先是 Redshift 的强大功能,能够选择通常用于您的数据大小的较小集群大小。S3 过滤和聚合技术允许 Spectrum 对 S3 中的数据执行操作(是的,查询的初始计算操作在 S3 内部执行,可能会大大减少移动到 Redshift 的数据)。如果您的查询模式支持 S3 中的这种数据缩减,那么数据移动将会很小,并且 Redshift 集群也可以很小(便宜)。对于像您这样的物联网解决方案来说,这可能是一个强大的折衷点,因为不需要复杂的数据模型和连接。

您提出胶水并转换为镶木地板。这样做可能很好,但正如我之前提到的,S3 中的数据分区通常更强大。Parquet 的价值将随着数据宽度的增加而增加。Parquet 是一种柱状格式,因此如果只需要“列”的子集,那么它是有利的。缺点是转换时间/成本以及易读性的损失(在调试过程中可能会很大)。

EMR 是您提到的另一个选择,但我通常建议客户不要使用 EMR,除非他们需要 EMR 为分析带来的灵活性,并且他们有能力很好地使用它。如果没有这些,EMR 往往会造成不必要的成本下降。

如果这真的是一个大数据解决方案,那么 RDS(和 Aurora)不是好的选择。它们是为事务性工作负载而不是分析而设计的。数据规模和分析无法很好地契合或不具有成本效益。

该领域的另一个工具是 S3 Select。不太可能是您正在寻找的东西,但存在一些值得记住的东西,并且可以成为工具箱中的工具。

如果基于某些因素存在不同的需求,混合解决方案在该领域很常见。常见的情况是“一天中的某个时间”——没有人在凌晨 3 点运行大量报告,因此所需的性能要少得多。另一个是用户组——一些组需要简单的分析,而另一些组则需要更多的能力。另一个因素是数据的及时性——每个人都需要“直到秒”的信息还是每天的信息就足够了?试图拥有一种始终为每个人做所有事情的工具通常会导致昂贵的、超大的解决方案。

由于 Redshift Spectrum 和 Athena 可以指向相同的 S3 数据(组织良好,因为两者都会受益),这两个工具可以在相同的数据上共存。此外,Redshift 非常适合筛选大量数据,非常适合生成汇总表,然后将它们(以分区 parquet 形式)写入 S3 供 Athena 等工具使用。所有这些云服务都可以按计划运行,其中包括 Redshift 和 EMR(Athena 是按需查询),因此它们不需要一直运行。Redshift with Spectrum 可以每天运行几个小时来执行深度分析并汇总数据以写入 S3。您的数据科学家还可以使用 Redshift 来完成核心工作,而 Athena 支持使用每日摘要数据和 Kinesis Data Analytics 作为源的仪表板。

最后,您提出了仪表板 2 秒的要求。这对于 Redshift 或 Athena 支持的 Quicksight 来说绝对是可能的,但对于任意复杂/数据密集型查询来说却无法满足。为了满足这一点,您需要引擎有足够的马力来生成相关数据。具有本地数据存储的 Redshift 可能是最快的(在某些情况下,在 S3 中进行一些数据修剪的 Redshift Spectrum 会获胜),而 Athena 是最弱/最慢的。但是,如果工作量很小,那么能力并不重要——您的查询工作量将是一个巨大的决定因素。最快的方法是将所需的数据加载到 Quicksight 存储 (SPICE) 中,但这是数据的另一个本地化/汇总版本,因此及时性再次成为一个因素(更新的频率)。

基于设计类似的系统和对您需要什么的一系列猜测,我建议您:

  1. 修复对象大小(可以配置 Kineses 来执行此操作)
  2. 按天对数据进行分区
  3. 设置小型 Redshift 集群 (4 X dc2.large) 并使用 Spectrum 源地址数据
  4. 将 Quicksight 连接到 Redshift
  5. 衡量性能(和成本)并与需求进行比较(可能存在差距)
  6. 调整解决方案(S3、Athena、SPICE 等汇总表)以实现目标

另一种方法是聘请以前建立过此类系统的人,让他们详细审查要求并提出较少“基于猜测”的建议。