连续聚合大型数据集

KGo*_*KGo 7 python mysql cron aggregate

我正在尝试一种算法来解决我遇到的这个问题.这不是硬件问题,而是我正在研究的一个副项目.

有一个表A大约有10 ^ 5行的顺序,并且每天以10 ^ 2的顺序添加新的.

表B具有10 ^ 6行的顺序,并且每天以10 ^ 3添加新的.从A到B有一对多关系(A中某些行有很多B行).

我想知道如何为这种数据进行连续聚合.我希望每隔约10分钟运行一次这样的工作并执行此操作:对于A中的每一行,查找在最后一天,一周和一个月中创建的B中与其相关的每一行(然后按计数排序)并保存他们在不同的数据库或缓存他们.

如果这令人困惑,这里有一个实际的例子:Say表A有亚马逊产品,表B有产品评论.我们希望显示最近4小时,每天,每周等评价最高的产品的分类列表.新产品和评论会快速添加,我们希望上述列表为最新版本尽可能.

我当前的实现只是一个for循环(伪代码):

result = []

for product in db_products:
    reviews = db_reviews(product_id=product.id, create>=some_time)
    reviews_count = len(reviews)
    result[product]['reviews'] = reviews
    result[product]['reviews_count'] = reviews_count

sort(result, by=reviews_count)

return result
Run Code Online (Sandbox Code Playgroud)

我每小时都这样做,并将结果保存在json文件中以供服务.问题是这不能很好地扩展,并且需要很长时间来计算.

那么,我在哪里可以解决这个问题呢?

更新:

谢谢您的回答.但我最终学习并使用Apache Storm.

Jan*_*sky 1

要求摘要

数据库中有两个更大的表,您需要定期创建过去时间段(小时、天、周等)的一些聚合,并将结果存储在另一个数据库中。

我假设,一旦过去一段时间,相关记录就不会发生变化,换句话说,过去一段时间的汇总始终具有相同的结果。

建议的解决方案:路易吉

Luigi是用于管道相关任务的框架,典型用途之一是计算过去时期的聚合。

概念如下:

  • 编写简单的任务实例,它定义所需的输入数据、输出数据(称为目标)和创建目标输出的过程。
  • 任务可以参数化,典型的参数是时间段(特定的天、小时、周等)
  • 路易吉可以在中间停止任务并稍后开始。它将考虑任何目标已存在的任务来完成,并且不会重新运行它(您必须删除目标内容才能让它重新运行)。

简而言之:如果目标存在,则任务完成。

这适用于多种类型的目标,例如本地文件系统、hadoop、AWS S3 以及数据库中的文件。

为了防止半途而废,目标实现会考虑原子性,因此,例如,文件首先在临时位置创建,并在完成后立即移动到最终目的地。

数据库中存在一些结构来表示某些数据库导入已完成。

您可以自由创建自己的目标实现(它必须创建一些东西并提供方法exists来检查,结果是否存在。

使用 Luigi 完成您的任务

对于您描述的任务,您可能会发现您需要的一切都已经存在。只是一些提示:

luigi.postgres.CopyToTable类允许将记录存储到 Postgres 数据库中。目标将自动创建所谓的“标记表”,其中它将标记所有已完成的任务。

其他类型的数据库也有类似的类,其中之一使用 SqlAlchemy,它可能会覆盖您使用的数据库,请参阅类luigi.contrib.sqla.CopyToTable

Luigi 文档是将数据导入 sqlite 数据库的工作示例

在 StackOverflow 的答案中,完整的实现是不可能的,但我确信,您会遇到以下情况:

  • 完成任务的代码非常清晰 - 没有样板代码,只写必须完成的事情。
  • 对处理时间段的良好支持 - 即使从命令行,请参阅例如 有效触发重复任务。它甚至会注意不要在过去走得太远,以防止生成太多任务可能使服务器超载(默认值设置得非常合理并且可以更改)。
  • 在多个服务器上运行任务的选项(使用 Luigi 实现提供的中央调度程序)。

我已经用 Luigi 处理了大量的 XML 文件,还做了一些任务,将聚合数据导入数据库并可以推荐它(我不是 Luigi 的作者,我只是快乐的用户)。

加快数据库操作(查询)

如果您的任务执行数据库查询的执行时间太长,您没有什么选择:

  • 如果您使用 Python 计算每个产品的评论,请考虑尝试 SQL 查询 - 它通常要快得多。应该可以创建 SQL 查询,该查询使用count正确的记录并直接返回您需要的数字。您甚至group by可以一次性获得所有产品的摘要信息。
  • 设置适当的索引,可能在“产品”和“时间段”列的“评论”表上。这将加快查询速度,但请确保,它不会过多地减慢插入新记录的速度(过多的索引可能会导致这种情况)。

有可能发生这样的情况:通过优化的 SQL 查询,即使不使用 Luigi,您也可以获得有效的解决方案。