ConcurrentDictionary GetOr添加异步

Zeu*_*s82 6 .net c# .net-core .net-core-2.2

我想使用诸如GetOrAdda之类的东西ConcurrentDictionary作为Web服务的缓存。该词典有异步版本吗?GetOrAdd将使用发出Web请求HttpClient,因此,如果该词典的某个版本中GetOrAdd是异步的,那就太好了。

为了消除混淆,字典的内容将是对Web服务的调用的响应。

ConcurrentDictionary<string, Response> _cache = new ConcurrentDictionary<string, Response>();



var response = _cache.GetOrAdd("id", (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse();} )
Run Code Online (Sandbox Code Playgroud)

ody*_*jii 9

GetOrAdd方法不太适合用于此目的。因为它不保证工厂只运行一次,它的唯一目的是一个次要的优化(次要的,因为添加是很少见的),因为它不需要散列两次并找到正确的桶(这会发生两次,如果您可以通过两个单独的调用进行设置)。

我建议你先检查缓存,如果在缓存中没有找到值,则进入某种形式的临界区(锁,信号量等),重新检查缓存,如果仍然缺失则取值并插入到缓存中。

这可确保您的后备存储仅被命中一次;即使多个请求同时缓存未命中,也只有第一个请求会实际获取值,其他请求将等待信号量,然后提前返回,因为它们会在临界区重新检查缓存。

伪代码(使用计数为 1 的 SemaphoreSlim,因为您可以异步等待它):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 是的,如果集合发生变化,“确实”很重要。例如,这可能会导致多次执行不应该重复的工作。 (5认同)
  • 该集合是ConcurrentDictionary,集合本身是线程安全的。您在这里出于不同的原因锁定。 (3认同)
  • 该集合不会抛出某种索引越界异常或返回垃圾数据,因为它被设计为从多个线程使用,但您现在尝试按顺序从中执行多个操作,并且不依赖于任何更改到那段时间的收藏,它不会为您提供。您不仅需要在此处显式锁定,还需要使用该集合在“任何地方”显式锁定,以确保其他人在您发现该值丢失后不会添加该值,或类似的情况。 (2认同)

Sid*_*dex 8

尝试这个扩展方法:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}
Run Code Online (Sandbox Code Playgroud)

相反dict.GetOrAdd(key,key=>something(key)),您使用await dict.GetOrAddAsync(key,async key=>await something(key)). 显然,在这种情况下你只需将其写为await dict.GetOrAddAsync(key,something),但我想说清楚。

关于维护操作顺序的担忧,我有以下意见:

  1. 如果你看看它的实现方式,使用普通的 GetOrAdd 将获得相同的效果。我确实使用了相同的代码并使其适用于异步。参考资料说

valueFactory 委托在锁外部调用,以避免在锁下执行未知代码可能出现的问题。因此,对于 ConcurrentDictionary<TKey,TValue> 类上的所有其他操作,GetOrAdd 不是原子操作

  1. ConcurrentDictionary 不支持 SyncRoot,它们使用内部锁定机制,因此不可能对其进行锁定。不过,使用您自己的锁定机制仅适用于此扩展方法。如果您使用另一个流程(例如使用 GetOrAdd),您将面临同样的问题。


Ser*_*rvy 7

GetOrAdd 不会成为异步操作,因为访问字典的值不是长期运行的操作。

但是,您可以做的只是将任务存储在词典中,而不是具体化的结果。任何需要结果的人都可以等待该任务。

但是,您还需要确保该操作仅启动一次,而不是多次启动。为了确保某些操作仅运行一次,而不是多次运行,您还需要添加Lazy

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;
Run Code Online (Sandbox Code Playgroud)

  • 这会将不完整的“任务”放入缓存中。如果“任务”出现故障或被取消会发生什么?该任务代表对远程资源的 HTTP 请求,失败的可能性不可忽略。 (3认同)
  • 它不需要是处理它的缓存的最终消费者,它可以是 OP 编写的代码的包装器。此答案中的代码不是完整的生产就绪的全功能缓存。它展示了如何解决所提出的问题,OP 需要在自己的包装缓存中完成该问题,以使其成为有价值的代码。就像你的答案有问题一样,它没有完成生产就绪的代码,而只是所问问题的解决方案。 (3认同)
  • 这对于缓存来说绝对是可怕的设计。它完全打破了抽象。如果我从子系统中获取一个值,则清理其内部缓存不是我的责任,因为它的实现已损坏。 (2认同)
  • @Darragh但是你不止一次执行该操作。这通常是不可接受的。Lazy 并不确保操作返回得更快,它确保它永远不会运行多次。 (2认同)

The*_*ias 5

可能使用具有高级异步功能的专用内存缓存(例如Alastair Crabtree 的LazyCache)比使用简单的ConcurrentDictionary<K,V>. 您将获得常用的功能,例如基于时间的过期,或自动逐出依赖于已过期的其他条目或依赖于可变外部资源(如文件、数据库等)的条目。这些功能对于手动实现来说并不简单。

下面是具有值的 sGetOrAddAsync的自定义扩展方法。它接受一个工厂方法,并确保该方法最多被调用一次。它还确保从字典中删除失败的任务。ConcurrentDictionaryTask<TValue>

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(valueFactory);
    Task<TValue> currentTask;
    if (source.TryGetValue(key, out currentTask))
        return currentTask;

    Task<Task<TValue>> newTaskTask = new(() => valueFactory(key));
    Task<TValue> newTask = null;
    newTask = newTaskTask.Unwrap().ContinueWith(task =>
    {
        if (!task.IsCompletedSuccessfully)
            source.TryRemove(KeyValuePair.Create(key, newTask));
        return task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default).Unwrap();

    currentTask = source.GetOrAdd(key, newTask);
    if (ReferenceEquals(currentTask, newTask))
        newTaskTask.RunSynchronously(TaskScheduler.Default);

    return currentTask;
}
Run Code Online (Sandbox Code Playgroud)

该方法是使用Task创建 Cold 的构造函数来实现的Task,只有在字典中成功添加时才会启动该构造函数。否则,如果另一个线程赢得了添加相同密钥的竞赛,则冷任务将被丢弃。与更简单的技术相比,使用此技术的优点Lazy<Task>是,如果valueFactory阻止当前线程,它也不会阻止正在等待相同密钥的其他线程。相同的技术可用于实现一个AsyncLazy<T>或一个AsyncExpiringLazy<T>类。

使用示例:

ConcurrentDictionary<string, Task<JsonDocument>> cache = new();

JsonDocument document = await cache.GetOrAddAsync("https://example.com", async url =>
{
    string content = await _httpClient.GetStringAsync(url);
    return JsonDocument.Parse(content);
});
Run Code Online (Sandbox Code Playgroud)

使用同步委托重载valueFactory

public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, TValue> valueFactory)
{
    ArgumentNullException.ThrowIfNull(valueFactory);
    return source.GetOrAddAsync(key, key => Task.FromResult<TValue>(valueFactory(key)));
}
Run Code Online (Sandbox Code Playgroud)

两个重载都会调用valueFactory当前线程上的委托。如果您出于某种原因更喜欢在 上调用委托ThreadPool,则可以将 替换RunSynchronouslyStart

对于在 .NET 6 之前的 .NET 版本上编译的方法版本GetOrAddAsync,您可以查看此答案的第三个修订版

优化:缓存Task<TResult>对象有点浪费,因为异步操作完成后,真正需要保留的就是TResult. 一个Task<TResult>对象包含多个内部字段,用于跟踪任务的完成状态、Exception信息等。对于ConcurrentDictionary<TKey, Task<TValue>>包含大量键的对象,所有这些不需要的字段可能会导致大量的内存浪费。减少内存占用的一个相对简单的方法是从 切换Task<TResult>ValueTask<TResult>. 这个想法是在原始任务ValueTask完成后立即用轻量级结果支持的任务取代繁重的原始任务支持。您可以在这里找到这个想法的实现。它是类型GetOrAddAsync的扩展方法ConcurrentDictionary<TKey, ValueTask<TValue>>