如何使用 AggregateToCollection() 将 IMongoQueryable 的结果存储在集合中

Jam*_*ker 3 c# linq iqueryable mongodb

除非我遗漏了什么,否则 C# MongoDb 驱动程序的文档中似乎存在很大的空白。

我正在尝试获取IMongoQueryable(这是各种 LINQWhere、Select 等操作的结果)并将结果存储在数据库端的集合中。当然,我可以在客户端迭代它并以这种方式持久化它,但即使是批量处理也不是高效的,并且在 shell 中使用$mergeor是一个非常容易的操作$out

在一个集合上,有一个方法AggregateToCollection<TResult>()正是我想要的,但它需要一个PiplineDefinition<TDocument, TResult>参数,我不知道如何从IMongoQueryable.

我一直在使用GetExecutionModel()IMongoQueryable获取BsonDocument或 Json 字符串,但我仍然不知道如何将其变成PipelineDefinition我需要的!

我本以为我会找到一个扩展方法,它IMongoQueryable允许我将其发送并合并到一个集合中。

到目前为止,为了能够将 an 发送IMongoQueryable到集合,我有以下内容:

    var executionModelDocument = queryable.GetExecutionModel().ToBsonDocument();

    // somehow turn the document into pipeline stages and a pipeline??

    await _database.GetCollection<TDocument>().AggregateToCollectionAsync<TResult>(pipeline);
Run Code Online (Sandbox Code Playgroud)

我该如何真正实现这一点?

Jam*_*ker 6

好吧,这最终比预期容易,但仍然令人沮丧。

首先,事实证明,您IMongoQueryable只需通过调用即可从 as JSON获取管道ToString()。虽然我说的是 JSON,但它并不完全是 - 输出包含管道的完整 JSON 作为阶段数组,但前面有一个标签并包含括号内的 JSON。我在这里走了一条捷径,只取了一个脏子串:

var queryableJson = queryable.ToString();
var trimmedDocument = queryableJson.Substring(10, queryableJson.Length - 11); // TODO: more reliably get the true json rather than blindly removing what should be "aggregate(" and ")"
Run Code Online (Sandbox Code Playgroud)

接下来,我将 JSON 重新序列化回数组BsonDocumentPipelineDefinition从中创建了 a ( aBsonDocument[]可以隐式转换为 a PipelineDefinition):

PipelineDefinition<TDocument, TResult> pipelineQueryable = BsonSerializer.Deserialize<BsonDocument[]>(trimmedDocument);
Run Code Online (Sandbox Code Playgroud)

请注意,虽然http://mongodb.github.io/mongo-csharp-driver/2.4/reference/driver/definitions/#pipelines上的文档说我可以隐式地为管道投射单个BsonDocument而不是阶段数组,这不是真的,除非在另一个我没有找到的命名空间中存在重载。

到目前为止,我们已经为 IMongoQueryable 定义了管道,我们可以简单地向其添加阶段来实现我们想要的结果(在本例中,是将管道的结果合并到另一个集合中)。您可以指定MergeStageOptions<TResult>对象的属性来控制行为,但默认值对我来说效果很好:

var stageMerge = PipelineStageDefinitionBuilder.Merge<TResult, TResult>(_database.GetCollection<TResult>(), new MergeStageOptions<TResult>());
var mergePipeline = pipelineQueryable.AppendStage(stageMerge);
Run Code Online (Sandbox Code Playgroud)

使用我们新增强的管道,我们可以将其应用于源集合,以将输出合并到目标集合中:

_database.GetCollection<TDocument>().AggregateToCollection(mergePipeline);
Run Code Online (Sandbox Code Playgroud)

为简单起见,我在这里演示的不是异步的,但是我在实际代码中利用了驱动程序中的可等待方法,因为同步方法只是包装了可等待版本。

这真的就是全部了!当我有更多时间时,我会返回并尝试跳过序列化-反序列化步骤,因为它明显很慢并且没有必要。为了流畅起见,我还计划将其转变为扩展方法。