优化分布式数据库聚合作业的网络带宽

Zim*_*oot 8 database algorithm optimization caching distributed-computing

我有一个分布式/联邦数据库,结构如下:

  1. 数据库分布在三个地理位置("节点")
  2. 每个节点都聚集了多个数据库
  3. 关系数据库是PostgreSQL,MySQL,Oracle和MS SQL Server的混合体; 非关系数据库是MongoDB或Cassandra
  4. 通过RabbitMQ实现每个节点内和节点联合内的松散耦合,每个节点运行RabbitMQ代理

我正在为跨越节点联合的作业实现一个只读的节点间聚合作业系统(即,对于非节点本地的作业).这些作业只执行"获取"查询 - 它们不会修改数据库.(如果作业的结果打算进入一个或多个数据库,那么这是通过一个单独的作业完成的,该作业不是我试图优化的节点间作业系统的一部分.)我的目标是最小化这些作业所需的网络带宽(首先是最小化节点间/ WAN带宽,然后最小化节点内/ LAN带宽); 我假设每个WAN链路的统一成本,以及每个LAN链路的另一个统一成本.这些工作对时间不是特别敏感.我在节点内执行一些CPU负载平衡,但不在节​​点之间执行.

对于聚合作业,通过WAN/LAN传输的数据量相对于群集本地或特定数据库的数据库写入量较小,因此在联合中完全分发数据库是不切实际的.

我用来最小化网络带宽的基本算法是:

  1. 如果作业运行在遍布联合的一组数据上,则管理器节点会向包含相关数据库查询的其他每个节点发送一条消息.
  2. 每个节点运行其查询集,使用gzip压缩它们,缓存它们,并将它们的压缩大小发送到管理器节点.
  3. 管理器移动到包含多个数据的节点(具体地说,移动到群集中具有最多数据且具有空闲核心的机器); 它从其他两个节点和集群中的其他机器请求其余数据,然后运行该作业.

在可能的情况下,作业使用分而治之的方法来最小化所需的数据共址量.例如,如果作业需要计算联合中所有销售数字的总和,则每个节点在本地计算其销售总额,然后在管理器节点聚合(而不是将所有未处理的销售数据复制到管理器节点) .但是,有时(例如,当在位于不同节点的两个表之间执行连接时)需要数据共址.

我做的第一件事就是聚合作业,并在十分钟时间内运行聚合作业(机器都运行NTP,所以我可以合理地确定"每十分钟"在每个节点上意味着相同的事情).目标是两个作业能够共享相同的数据,这降低了传输数据的总体成本.

  1. 给定两个查询同一个表的作业,我生成每个作业的结果集,然后我取两个结果集的交集.
  2. 如果两个作业都安排在同一节点上运行,则网络传输成本计算为两个结果集之和减去两个结果集的交集.
  3. 两个结果集存储到PostgreSQL临时表(在关系数据的情况下),或者存储在选择运行作业的节点上的临时Cassandra columnfamilies/MongoDB集合(在nosql数据的情况下); 然后,针对组合的结果集执行原始查询,并且传递的数据是针对各个作业的.(此步骤仅在组合结果集上执行;单个结果集数据只是传递到其作业,而不首先存储在临时表/列族/集合中.)

这导致网络带宽的改善,但我想知道是否有一个框架/库/算法可以改善这一点.我考虑的一个选项是在确定网络带宽时将结果集缓存在节点上并考虑这些缓存的结果集(即除了当前的预先安排的共址作业集之外,尝试在作业之间重用结果集,以便例如在一个10分钟的纪元中运行的作业可以使用前一个10分钟结果集中的缓存结果集,但除非作业使用完全相同的结果集(即除非他们使用相同的where子句),否则我不知道一般情况 - 填充结果集中间隙的目的算法(例如,如果结果集使用子句"其中N> 3"而另一个作业需要带有子句"where N> 0"的结果集,那么我可以使用什么算法确定我需要将原始结果集和结果集的并集与"其中N> 0和N <= 3"的子句一起使用 - 我可以尝试编写自己的算法来执行此操作,但结果将是越野车无用的混乱.我还需要确定缓存数据何时过时 - 最简单的方法是将缓存数据的时间戳与源表上最后修改的时间戳进行比较,如果时间戳已更改,则替换所有数据,但理想情况下我希望能够仅更新已按行或每块时间戳更改的值.

Zim*_*oot 4

我已经开始实施我的问题解决方案。

为了简化节点内缓存并简化 CPU 负载平衡,我在每个数据库集群(“Cassandra 节点”)上使用 Cassandra 数据库来运行聚合作业(之前我手动聚合本地数据库结果集) ) - 我使用单个 Cassandra 数据库来存储关系数据、Cassandra 和 MongoDB 数据(缺点是一些关系查询在 Cassandra 上运行速度较慢,但​​这可以通过单个统一聚合数据库更容易执行来弥补)维护而不是单独的关系和非关系聚合数据库)。我也不再在十分钟的时期内聚合作业,因为缓存使得这个算法变得不必要。

节点中的每台机器都引用一个名为 Cassandra_Cache_[MachineID] 的 Cassandra 列族,用于存储已发送到 Cassandra 节点的 key_ids 和 column_ids。Cassandra_Cache 列族由 Table 列、Primary_Key 列、Column_ID 列、Last_Modified_Timestamp 列、Last_Used_Timestamp 列以及由 Table|Primary_Key|Column_ID 组成的复合键组成。Last_Modified_Timestamp 列表示来自源数据库的数据的last_modified 时间戳,Last_Used_Timestamp 列表示聚合作业上次使用/读取数据的时间戳。当 Cassandra 节点向机器请求数据时,机器会计算结果集,然后获取结果集与其 Cassandra_Cache 中的表|键|列的集合差,这些表|键|列与其 Cassandra_Cache 中的行具有相同的 Last_Modified_Timestamp(如果时间戳不匹配,则缓存的数据已过时并与新的 Last_Modified_Timestamp 一起更新)。然后,本地计算机将设置的差异发送到 Cassandra 节点,并使用设置的差异更新其 Cassandra_Cache,并更新用于组成结果集的每个缓存数据的 Last_Used_Timestamp。(为每个表|键|列维护单独的时间戳的更简单的替代方法是为每个表|键维护一个时间戳,但这不太精确,并且表|键|列时间戳并不太复杂。) Cassandra_Cache 之间的同步仅要求本地计算机和远程节点发送与每个作业关联的 Last_Used_Timestamp,因为作业中的所有数据都使用相同的 Last_Used_Timestamp。

Cassandra 节点使用从节点内部接收的新数据以及从其他节点接收的数据更新其结果集。Cassandra 节点还维护一个列族,该列族存储与每台计算机的 Cassandra_Cache 中相同的数据(Last_Modified_Timestamp 除外,仅在本地计算机上需要它来确定数据何时过时),以及指示数据是否来自的源 ID来自节点内或来自另一个节点——该id区分不同节点内,但不区分本地节点内不同机器。(另一种选择是使用统一的 Cassandra_Cache,而不是在每台机器上使用一个 Cassandra_Cache 并为节点使用另一个 Cassandra_Cache,但我认为增加的复杂性不值得节省空间。)

每个 Cassandra 节点还维护一个 Federated_Cassandra_Cache,它由已从本地节点发送到其他两个节点之一的 {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} 元组组成。

当作业通过管道时,每个 Cassandra 节点都会使用本地结果集更新其节点内缓存,并完成可以在本地执行的子作业(例如,在对多个节点之间的数据求和的作业中,每个节点将其自身的数据求和)节点内数据,以最大限度地减少需要在节点间联合中共同定位的数据量) - 如果子作业仅使用节点内数据,则可以在本地执行。然后,管理器节点确定在哪个节点上执行剩余的工作:每个 Cassandra 节点可以通过获取其结果集与已缓存的结果集子集的集合差值,在本地计算将其结果集发送到另一个节点的成本到其 Federated_Cassandra_Cache,并且管理器节点最小化成本方程 [“从 NodeX 传输结果集的成本”+“从 NodeY 传输结果集的成本”]。例如,Node1 将其结果集传输到 {Node2, Node3} 需要花费 Node1 {3, 5},Node2 将其结果集传输到 {Node1, Node3} 需要花费 {2, 2},Node3 需要花费 {4, 3}将其结果集传输到 {Node1, Node2},因此该作业在 Node1 上运行,成本为“6”。

我对每个 Cassandra 节点使用 LRU 驱逐策略;我最初使用最旧的优先驱逐策略,因为它更容易实现,并且需要更少的写入 Last_Used_Timestamp 列(每次数据更新一次,而不是每次数据读取一次),但 LRU 策略的实现结果并不过分复杂且 Last_Used_Timestamp 写入不会造成瓶颈。当 Cassandra 节点达到 20% 的可用空间时,它会逐出数据,直到达到 30% 的可用空间,因此每次逐出的大小大约为总可用空间的 10%。节点维护两个时间戳:最后驱逐的节点内数据的时间戳,以及最后驱逐的节点间/联邦数据的时间戳;由于节点间通信相对于节点内通信的延迟增加,逐出策略的目标是让 75% 的缓存数据为节点间数据,25% 的缓存数据为节点内数据,可以通过让每次驱逐的 25% 为节点间数据、每次驱逐的 75% 为节点内数据来快速近似。驱逐工作如下:

while(evicted_local_data_size < 7.5% of total space available) {
    evict local data with Last_Modified_Timestamp < 
        (last_evicted_local_timestamp += 1 hour)
    update evicted_local_data_size with evicted data
}

while(evicted_federated_data_size < 2.5% of total space available) {
    evict federated data with Last_Modified_Timestamp < 
        (last_evicted_federated_timestamp += 1 hour)
    update evicted_federated_data_size with evicted data
}
Run Code Online (Sandbox Code Playgroud)

在从节点内的机器和其他节点收到驱逐确认之前,被驱逐的数据不会被永久删除。

然后,Cassandra 节点向其节点内的计算机发送通知,指示新的 last_evicted_local_timestamp 是什么。本地计算机更新其 Cassandra_Cache 以反映新的时间戳,并在完成后向 Cassandra 节点发送通知;当 Cassandra 节点收到来自所有本地计算机的通知时,它会永久删除被逐出的本地数据。Cassandra 节点还会向远程节点发送带有新的 last_evicted_federated_timestamp 的通知;其他节点更新其 Federated_Cassandra_Caches 以反映新的时间戳,并且 Cassandra 节点在收到来自每个节点的通知时永久删除被驱逐的联邦数据(Cassandra 节点会跟踪数据来自哪个节点,因此在收到驱逐后来自 NodeX 的确认(节点可以在收到来自 NodeY 的驱逐确认之前永久删除被驱逐的 NodeX 数据)。在所有计算机/节点发送通知之前,如果 Cassandra 节点从尚未逐出旧数据的计算机/节点接收到结果集,则它会在其查询中使用缓存的逐出数据。例如,Cassandra 节点有一个已驱逐的本地 Table|Primary_Key|Column_ID 数据,同时本地计算机(尚未处理驱逐请求)未在其结果集中包含 Table|Primary_Key|Column_ID 数据,因为它认为Cassandra 节点的缓存中已包含数据;Cassandra 节点从本地计算机接收结果集,并且由于本地计算机尚未确认逐出请求,因此 Cassandra 节点将缓存的逐出数据包含在其自己的结果集中。