我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当发生变更时,变更流功能运行的次数与进程的数量一样多。
如何进行设置以使更改流仅运行一次?
这是我所得到的:
const options = { fullDocument: "updateLookup" };
const filter = [
{
$match: {
$and: [
{ "updateDescription.updatedFields.sites": { $exists: true } },
{ operationType: "update" }
]
}
}
];
const sitesStream = Client.watch(sitesFilter, options);
// Start listening to site stream
sitesStream.on("change", async change => {
console.log("in site change stream", change);
console.log(
"in site change stream, update desc",
change.updateDescription
);
// Do work...
console.log("site change stream done.");
return;
});
Run Code Online (Sandbox Code Playgroud) 是否可以广泛使用变更流?我想观看许多带有各种参数的文档的集合。这个想法是允许多个用户观看他们感兴趣的数据。因此,不仅要显示一些实时更新,例如来自单个集合或其他内容的一些股票数据,而且要允许现代 Web 应用程序是实时的时间。我偶然发现了一些讨论,例如这个讨论表明该功能不可用于此类目的。
想象一下实施众所周知的社交网络。每个用户都希望获得以下方面的实时数据:(1) 通知、(2) 在线好友、(3) 好友请求、(4) 新闻源、(5) 对新闻源帖子的评论(也许每个帖子一个?)。这使得每个用户至少有 5 个开放的变更流。如果一项服务连接了 10000 个用户,那么它会产生 50000 个活动变更流。
该机制是否准备好承受这样的负载?如果我理解了讨论(以及其他一些讨论),那么每个变更流观察者都会创建一个连接。拥有数万个连接可以吗?这似乎不是一个好的设计。似乎最好在应用程序服务器上监视每个集合并进行过滤,但这更多是数据库服务器的工作。
有没有办法如何使用 mongo db 处理这样的负载?
我有一个包含两个集合的 MongoDB,Items以及Calculations.
Items
value: Number
date: Date
Run Code Online (Sandbox Code Playgroud)
Calculations
calculation: Number
start_date: Date
end_date: Date
Run Code Online (Sandbox Code Playgroud)
ACalculation是基于DB 中Item所有Items日期在Calculation的开始日期和结束日期之间的值的存储计算。
我认为创建/更新的一个好方法Calculations是在Items集合上创建一个 Mongo 更改流,它侦听对集合的更改Items,然后重新计算相关的Calculations.
问题是,根据Mongo Change Event 文档,当文档被删除时,该fullDocument字段被省略,这将阻止我访问删除Item的日期,这将通知哪些Calculations应该更新。
有什么方法可以访问fullDocument由于文档删除而触发的 Mongo Change 事件?
在版本 4 中,MongoDB 更改流可以使用两个不同的参数来指定恢复更改流的位置:(resumeAfter某些内部标记)和startAtOperationTime时间戳类型。
是否可以通过使用每个更改事件中的发现来完全替换resumeAfter以startAtOperationTime安全恢复更改流clusterTime?
我特别关心的是,我在文档中找不到确切的信息,即startAtOperationTime相同的规则和保证是否适用于可以恢复的内容以及持续多长时间。这里使用的操作时间是否正确保存并且它始终可以用作通常使用的文档令牌的替代品resumeAfter?
我正在使用 Spring Boot 应用程序。我正在尝试为 MongoDB 中的集合修改实现基于回调的事件通知。我已经没有想法了,因为我尝试了以下方法:
经典轮询 - 冗余,因为现有实现是由 UI 轮询的 REST 端点,在其中查询数据。
可尾游标 - 需要对集合进行上限,这可能是一个限制,不足以满足具有非常高存储预测的数据库。
更改流 - 我收到运行时异常,指出存储引擎不支持“多数读取关注”。
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update"))))).forEach(printBlock);
我无权查看引擎配置,但我假设如果 DBA 无法将存储引擎更改为wiredTiger,那么我就无法使用更改流。它是否正确?还有其他解决方案吗?spring 的 mongodb-reactive API 怎么样?我的印象是 API 仍然依赖于可尾游标或更改流。
我正在考虑实现MongoDB变更流读取器,并且我想确保自己做的正确。关于如何实现实际阅读器代码,有很多简单的示例,包括官方文档,我对此不太担心。
但是,我有点担心读者会落在变更流之后而无法跟上变化,我想确保读者能够处理变更流。
mongo服务器是一个群集,为了便于讨论,我们假设它每天都非常忙。鉴于更改流API必须迭代流结果而不是像队列一样对其进行操作,因此更改流API似乎仅与单个实例兼容。因此,我担心,与将新项目推送到流中相比,迭代实例的单个实例完成工作的时间可能更长。
我的直觉是实际上让读者简单地读取流,将更改分批处理,然后将其推入队列,然后其他工作人员可以水平扩展以完成工作。但是,作为读者,我仍然只有一个实例,即使仅做一些将修改放入队列的最小工作,它在理论上仍可能落后于潮流。
所以我的问题是,这有多么现实的担忧,并且有什么办法可以创建一种读者,即使仅将更改流式传输到工作人员队列中,也可以水平扩展?我还应考虑哪些其他因素?
我有一项服务需要监视 Mongo DB 上的集合以在系统中创建更改。我已设法使用 C# 驱动程序建立与副本集的连接,并使用以下代码来测试更改流。
public async Task WatchLoopAsync()
{
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = await _collection.WatchAsync(options))
{
_logger.LogInformation("Watching collection {String}",
_deployments.CollectionNamespace);
await cursor.ForEachAsync(changeStreamDocument =>
{
var document = changeStreamDocument.FullDocument;
_logger.LogInformation("Received document: {String}",
document.ToString());
});
}
}
Run Code Online (Sandbox Code Playgroud)
第一个日志显示,表明它正在使用正确的命名空间监视集合。然后,我将一个文档添加到集合中,希望看到某些内容记录为“已接收文档:...”,但没有任何记录。
我遵循此处文档中给出的异步模式。
问题就在这里。我有mongos连接到远程的本地实例mongod。远程数据库使用基本密码身份验证。我正在尝试使用简单的 Scala 应用程序为特定集合设置 ChangeStream 观察器。实际的代码如下所示:
private val mongo = new MongoClient(
new ServerAddress("localhost", 27017),
MongoCredential.createCredential("username", "myDB", "password".toCharArray),
MongoClientOptions.builder().addServerListener(ServerStateListener).build()
)
private val collection = mongo
.getDatabase(DB)
.getCollection("someObjectsCollection")
private val ch = collection
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator()
Run Code Online (Sandbox Code Playgroud)
它断线.fullDocument(FullDocument.UPDATE_LOOKUP)告诉:
Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error 13: 'not authorized on myDB to execute command { aggregate: "someObjectsCollection", pipeline: [ { $changeStream: { fullDocument: "updateLookup" } } ], cursor: {}, $db: "myDB", $clusterTime: { clusterTime: Timestamp(1524064297, 2), …Run Code Online (Sandbox Code Playgroud) 我知道当 mongo 的任何监视实体被更改/添加时,mongo 会在更改流中记录这些更改,应用程序可以监听这些更改。
当单个变更流事件被清除时,变更流的最大容量是多少。是否存在由于最大容量限制而在通知订阅者之前从更改流中删除日志的负面情况。
我在他们的官方页面中找不到任何此类数据:https : //docs.mongodb.com/manual/changeStreams/
changestream ×10
mongodb ×10
node.js ×2
c# ×1
database ×1
events ×1
java ×1
replicaset ×1
scala ×1
spring-boot ×1