KGo*_*KGo 7 python mysql cron aggregate
我正在尝试一种算法来解决我遇到的这个问题.这不是硬件问题,而是我正在研究的一个副项目.
有一个表A大约有10 ^ 5行的顺序,并且每天以10 ^ 2的顺序添加新的.
表B具有10 ^ 6行的顺序,并且每天以10 ^ 3添加新的.从A到B有一对多关系(A中某些行有很多B行).
我想知道如何为这种数据进行连续聚合.我希望每隔约10分钟运行一次这样的工作并执行此操作:对于A中的每一行,查找在最后一天,一周和一个月中创建的B中与其相关的每一行(然后按计数排序)并保存他们在不同的数据库或缓存他们.
如果这令人困惑,这里有一个实际的例子:Say表A有亚马逊产品,表B有产品评论.我们希望显示最近4小时,每天,每周等评价最高的产品的分类列表.新产品和评论会快速添加,我们希望上述列表为最新版本尽可能.
我当前的实现只是一个for循环(伪代码):
result = []
for product in db_products:
reviews = db_reviews(product_id=product.id, create>=some_time)
reviews_count = len(reviews)
result[product]['reviews'] = reviews
result[product]['reviews_count'] = reviews_count
sort(result, by=reviews_count)
return result
Run Code Online (Sandbox Code Playgroud)
我每小时都这样做,并将结果保存在json文件中以供服务.问题是这不能很好地扩展,并且需要很长时间来计算.
那么,我在哪里可以解决这个问题呢?
更新:
谢谢您的回答.但我最终学习并使用Apache Storm.
数据库中有两个更大的表,您需要定期创建过去时间段(小时、天、周等)的一些聚合,并将结果存储在另一个数据库中。
我假设,一旦过去一段时间,相关记录就不会发生变化,换句话说,过去一段时间的汇总始终具有相同的结果。
Luigi是用于管道相关任务的框架,典型用途之一是计算过去时期的聚合。
概念如下:
简而言之:如果目标存在,则任务完成。
这适用于多种类型的目标,例如本地文件系统、hadoop、AWS S3 以及数据库中的文件。
为了防止半途而废,目标实现会考虑原子性,因此,例如,文件首先在临时位置创建,并在完成后立即移动到最终目的地。
数据库中存在一些结构来表示某些数据库导入已完成。
您可以自由创建自己的目标实现(它必须创建一些东西并提供方法exists来检查,结果是否存在。
对于您描述的任务,您可能会发现您需要的一切都已经存在。只是一些提示:
luigi.postgres.CopyToTable类允许将记录存储到 Postgres 数据库中。目标将自动创建所谓的“标记表”,其中它将标记所有已完成的任务。
其他类型的数据库也有类似的类,其中之一使用 SqlAlchemy,它可能会覆盖您使用的数据库,请参阅类luigi.contrib.sqla.CopyToTable
Luigi 文档是将数据导入 sqlite 数据库的工作示例
在 StackOverflow 的答案中,完整的实现是不可能的,但我确信,您会遇到以下情况:
我已经用 Luigi 处理了大量的 XML 文件,还做了一些任务,将聚合数据导入数据库并可以推荐它(我不是 Luigi 的作者,我只是快乐的用户)。
如果您的任务执行数据库查询的执行时间太长,您没有什么选择:
count正确的记录并直接返回您需要的数字。您甚至group by可以一次性获得所有产品的摘要信息。有可能发生这样的情况:通过优化的 SQL 查询,即使不使用 Luigi,您也可以获得有效的解决方案。