MongoDb实时(或接近实时)流式传输插入的数据

Nig*_*olf 6 java streaming real-time mongodb nosql

我有许多MongoDB集合,它们从各种流媒体源中获取大量JSON文档.换句话说,有许多进程不断地将数据插入一组MongoDB集合中.

我需要一种方法将数据从MongoDB流式传输到下游应用程序.所以我想要一个概念上看起来像这样的系统:

App Stream1 --> 
App Stream2 -->     MONGODB     --->  Aggregated Stream
App Stream3 -->
Run Code Online (Sandbox Code Playgroud)

或这个:

App Stream1 -->                 --->  MongoD Stream1
App Stream2 -->     MONGODB     --->  MongoD Stream2
App Stream3 -->                 --->  MongoD Stream3
Run Code Online (Sandbox Code Playgroud)

问题是如何在不必连续轮询/查询数据库的情况下从Mongo流式传输数据?

最明显的问题的答案是"你为什么不改变这些应用程序流的过程将消息发送到队列像兔子,零个或ActiveMQ的,然后有他们马上发送到您的蒙戈流流程和蒙戈像这样":

                 MONGODB
                   /|\  
                    |
App Stream1 -->     |          --->  MongoD Stream1
App Stream2 -->  SomeMQqueue   --->  MongoD Stream2
App Stream3 -->                --->  MongoD Stream3
Run Code Online (Sandbox Code Playgroud)

理想的世界是好的,但是我们需要Mongo来确保首先保存消息,以避免重复并确保ID全部生成等.Mongo必须作为持久层坐在中间.

那么我如何将消息从Mongo集合(不使用GridFS等)流式传输到这些下游应用程序中.基本思路只是轮询新文档,并且每个收集的文档都通过向存储在数据库中的JSON文档添加另一个字段来更新它,就像存储处理时间戳的SQL表中的进程标志一样.即每1秒轮询一次处理的文件== null .... add processed = now()....更新文件.

是否有更整洁/更有计算效率的方法?

仅供参考 - 这些都是Java进程.

干杯!

lob*_*234 3

如果要写入上限集合(或多个集合),则可以使用 tailablecursor新数据推送到流中,或推送到可以将其流出的消息队列中。然而,这对于无上限的集合不起作用。