MQ以异步方式处理,聚合和发布数据

Lor*_*tté 7 java notifications dataflow message-queue redis

一些背景,在得到真正的问题之前:

我正在开发一个由几个不同模块组成的后端应用程序.目前,每个模块都是一个命令行java应用程序,它是"按需"运行的(稍后会详细介绍).

每个模块都是一个"步骤",是一个更大的过程的一部分,您可以将其视为数据流; 第一步从外部源收集数据文件并将其推送/加载到某些SQL数据库表中; 然后根据不同的条件和事件(时间,数据库中存在数据,通过Web服务/ Web界面完成的消息和详细说明),从(1个或多个)DB表中获取数据,处理它们,并将它们写在不同的表格上.步骤在三个不同的服务器上运行,并从三个不同的DB读取数据,但只能在一个DB中写入.目的是汇总数据,计算指标和统计数据.

目前,每个模块都是定期执行的(从第一个模块的几分钟/小时,到链中最后一个模块的几天,需要聚合更多数据,因此等待"更长时间"从它们可用),使用的cronjob.运行一个模块(当前是一个java控制台应用程序),它会检查数据库中给定日期时间窗口中新的未处理信息,并完成其工作.

问题:它有效,但是......我需要扩展和维护它,这种方法开始显示其局限性.

  1. 我不喜欢依靠"民意调查"; 这是浪费,考虑到以前模块的信息足以在他们需要的信息可用时"告诉"链中的其他模块,并且他们可以继续.
  2. 它"缓慢":链条上的模块延迟了几天,因为我们必须确保数据是由前面的模块到达和处理的.所以我们"停止"这些模块,直到我们确定我们拥有所有数据.新增功能需要实时(不是很难,但"尽快")计算某些指标.一个很好的例子就是在这里发生的事情,在SO上,带有徽章!:)我需要获得一些非常相似的东西.

为了解决第二个问题,我将介绍"部分"或"增量"计算:只要我有一组相关信息,我就会处理它.然后,当一些其他链接信息到达时,我计算差异并相应地更新数据,但我还需要通知其他(从属)模块.

问题

- 1)哪种方法最好? - 2)相关:哪些是"通知"其他模块(在我的情况下是java可执行文件)相关数据可用的最佳方式?

我可以看到三种方式:

  • 将其他"非数据"表添加到数据库中,每个模块写入"嘿,我已经完成了这个并且它可用".当cronjob启动另一个模块时,它会读取表格,确定他可以计算子集xxx,然后执行.等等
  • 使用Message Queues,如ZeroMQ(或Apache Camel,如@mjn建议)而不是DB表
  • 使用像Redis这样的键值存储,而不是数据库表

编辑:我确信基于队列的方法是要走的路,我为完整性添加了"table + polling"选项,但现在我明白这只是一种分心(显然,每个人都会回答"是的,使用队列,民意调查是邪恶的" - 这是正确的!".因此,让我重新解释一下这个问题: 使用像Redis这样的pub/sub的键值存储使用MQ的优点/缺点是什么?

  • 3)有没有任何解决方案可以帮我完全摆脱cronjobs?

编辑:特别是,在可能的情况下,它意味着:在某些MQ和/或键值存储中是否有一种机制可以让我发布带有"时间"的消息?比如"在1天内交付"?显然,持久性和"几乎一次"交付保证

  • 4)我应该将此消息(基于事件?)的解决方案构建为集中式服务,并将其作为其中一个服务器上的守护程序/服务运行吗?
  • 5)我是否应该放弃按需启动订阅者的想法,让每个模块作为守护进程/服务连续运行?
  • 6)哪些是赞成和缺点(可靠性,单点故障与资源使用和复杂性......)?

编辑:这是我最关心的一点:我想"排队"自己根据队列中的消息激活"模块",类似于MSMQ激活.这是个好主意吗?Java世界中有什么东西可以实现它,我应该自己实现它(通过MQ还是通过Redis),还是应该将每个模块作为守护进程运行?(即使某些计算通常是在突发中发生,两小时处理然后是两天的空转?)

注意:我不能使用重型容器/ EJB(No Glassfish或类似产品)

编辑:骆驼对我来说似乎有点太重了.无论是在资源还是开发的复杂性方面,我都在寻找一些非常轻松的东西

Lor*_*tté 0

实现之后,我觉得回答我自己的问题对于将来访问 StackOverflow 的人来说是有好处的。

最后我选择了Redis。它确实非常快且可扩展。而且我非常喜欢它的灵活性:它比消息队列灵活得多。我是否断言 Redis 在 MQ 方面比现有的各种 MQ 更好?嗯,就我的具体情况而言,我相信是这样。要点是:如果某些东西没有提供现成的,您可以构建它(通常使用 MULTI - 但您甚至可以使用 LUA 进行更高级的定制!)。

例如,我遵循这个很好的答案来实现“持久”、可恢复的发布/订阅(即允许客户端死亡并重新连接而不会丢失消息的发布/订阅)。

这帮助我满足了可扩展性和“可靠性”要求:我决定保持管道中的每个部分独立(目前是守护进程),但添加一个检查 Redis 上的列表/队列的监视器;如果某些内容没有被消耗(或消耗得太慢),监视器会生成一个新的消费者。我也在想真正的“弹性”,增加消费者在无事可做时自杀的能力。

另一个例子:执行计划的活动。我正在遵循这种目前看来很流行的方法。但我渴望尝试keyspace notification,看看过期密钥和通知的组合是否是一种更好的方法。

最后,作为访问 Redis 的库,我选择了 Jedis:它很流行、受支持,并且提供了一个很好的接口来实现 pub/sub 作为侦听器。这不是 Scala 的最佳方法(惯用),但效果很好。