使用 LINQ 的 AsParallel() 并行运行异步函数

vrc*_*cks 5 async-await azure-cosmosdb

我有一个文档数据库存储库类,它有一个 get 方法,如下所示:

private static DocumentClient client;
public async Task<TEntity> Get(string id, string partitionKey = null)
{
    try
    {
        RequestOptions requestOptions = null;
        if (partitionKey != null)
        {
            requestOptions = new RequestOptions { PartitionKey = new PartitionKey(partitionKey) };
        }

        var result = await client.ReadDocumentAsync(
            UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id),
            requestOptions);
        return (TEntity)(dynamic)result.Resource;
    }
    catch (DocumentClientException e)
    {
        // Have logic for different exceptions actually
        throw;
    }
}
Run Code Online (Sandbox Code Playgroud)

我有两个集合 - Collection1 和 Collection2。Collection1 未分区,而 Collection2 已分区。

在客户端,我创建两个存储库对象,每个集合一个。

private static DocumentDBRepository<Collection1Item> collection1Repository = new DocumentDBRepository<Collection1Item>("Collection1");
private static DocumentDBRepository<Collection2Item> collection2Repository = new DocumentDBRepository<Collection2Item>("Collection2");

List<Collection1Item> collection1Items = await collection1Repository.GetItemsFromCollection1(); // Selects first forty documents based on time

List<UIItem> uiItems = new List<UIItem>();
foreach (var item in collection1Items)
{
    var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    // TargetId is my partition key for Collection2
    uiItems.Add(new UIItem
    {
        ItemId = item.ItemId,
        Collection1Reference = item.Id,
        TargetId = item.TargetId,        
        Collection2Reference = item.Collection2Reference,
        Value = collection2Item.Value
    });
}
Run Code Online (Sandbox Code Playgroud)

这很好用。但由于它是与 foreach 顺序发生的,所以我想并行执行这些 Get 调用。当我如下并行执行时:

ConcurrentBag<UIItem> uiItems = new ConcurrentBag<UIItem>();
collection1Items.AsParallel().ForAll(async item => {
    var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    // TargetId is my partition key for Collection2
    uiItems.Add(new UIItem
    {
        ItemId = item.ItemId,
        Collection1Reference = item.Id,
        TargetId = item.TargetId,        
        Collection2Reference = item.Collection2Reference,
        Value = collection2Item.Value
    });
  }
);
Run Code Online (Sandbox Code Playgroud)

它不起作用并且uiItems始终为空。

Pan*_*vos 6

您不需要 Parallel.For 来同时运行异步操作。如果它们确实是异步的,那么它们已经同时运行。

您可以收集从每个操作返回的任务,然后对所有任务简单地调用await Task.WhenAll()。如果您修改 lambda 以创建并返回 UIItem,则结果await Task.WhenAll()将是 UIItem 的集合。无需从并发操作内部修改全局状态。

例如:

var itemTasks = collection1Items.Select(async item =>
    {
        var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    
        return new UIItem
        {
            ItemId = item.ItemId,
            Collection1Reference = item.Id,
            TargetId = item.TargetId,        
            Collection2Reference = item.Collection2Reference,
            Value = collection2Item.Value
        }
    });

var results= await Task.WhenAll(itemTasks);
Run Code Online (Sandbox Code Playgroud)

但请注意 - 这将同时触发所有 Get操作。这可能不是您想要的,尤其是在调用具有速率限制的服务时。

  • @MiklósTóth 有更好的选择,例如*不*生成数千个任务,而是使用 TPL 数据流或反应式扩展来控制并发任务的数量甚至速率。无需对数据进行分区。ForEachAsync 不是为了限制任务速率而创建的 (4认同)