Zeu*_*s82 6 .net c# .net-core .net-core-2.2
我想使用诸如GetOrAdda之类的东西ConcurrentDictionary作为Web服务的缓存。该词典有异步版本吗?GetOrAdd将使用发出Web请求HttpClient,因此,如果该词典的某个版本中GetOrAdd是异步的,那就太好了。
为了消除混淆,字典的内容将是对Web服务的调用的响应。
ConcurrentDictionary<string, Response> _cache = new ConcurrentDictionary<string, Response>();
var response = _cache.GetOrAdd("id", (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse();} )
Run Code Online (Sandbox Code Playgroud)
该GetOrAdd方法不太适合用于此目的。因为它不保证工厂只运行一次,它的唯一目的是一个次要的优化(次要的,因为添加是很少见的),因为它不需要散列两次并找到正确的桶(这会发生两次,如果您可以通过两个单独的调用进行设置)。
我建议你先检查缓存,如果在缓存中没有找到值,则进入某种形式的临界区(锁,信号量等),重新检查缓存,如果仍然缺失则取值并插入到缓存中。
这可确保您的后备存储仅被命中一次;即使多个请求同时缓存未命中,也只有第一个请求会实际获取值,其他请求将等待信号量,然后提前返回,因为它们会在临界区重新检查缓存。
伪代码(使用计数为 1 的 SemaphoreSlim,因为您可以异步等待它):
async Task<TResult> GetAsync(TKey key)
{
// Try to fetch from catch
if (cache.TryGetValue(key, out var result)) return result;
// Get some resource lock here, for example use SemaphoreSlim
// which has async wait function:
await semaphore.WaitAsync();
try
{
// Try to fetch from cache again now that we have entered
// the critical section
if (cache.TryGetValue(key, out result)) return result;
// Fetch data from source (using your HttpClient or whatever),
// update your cache and return.
return cache[key] = await FetchFromSourceAsync(...);
}
finally
{
semaphore.Release();
}
}
Run Code Online (Sandbox Code Playgroud)
尝试这个扩展方法:
/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
this ConcurrentDictionary<TKey,TResult> dict,
TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
if (dict.TryGetValue(key, out TResult resultingValue))
{
return resultingValue;
}
var newValue = await asyncValueFactory(key);
return dict.GetOrAdd(key, newValue);
}
Run Code Online (Sandbox Code Playgroud)
相反dict.GetOrAdd(key,key=>something(key)),您使用await dict.GetOrAddAsync(key,async key=>await something(key)). 显然,在这种情况下你只需将其写为await dict.GetOrAddAsync(key,something),但我想说清楚。
关于维护操作顺序的担忧,我有以下意见:
valueFactory 委托在锁外部调用,以避免在锁下执行未知代码可能出现的问题。因此,对于 ConcurrentDictionary<TKey,TValue> 类上的所有其他操作,GetOrAdd 不是原子操作
GetOrAdd 不会成为异步操作,因为访问字典的值不是长期运行的操作。
但是,您可以做的只是将任务存储在词典中,而不是具体化的结果。任何需要结果的人都可以等待该任务。
但是,您还需要确保该操作仅启动一次,而不是多次启动。为了确保某些操作仅运行一次,而不是多次运行,您还需要添加Lazy:
ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();
var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;
Run Code Online (Sandbox Code Playgroud)
可能使用具有高级异步功能的专用内存缓存(例如Alastair Crabtree 的LazyCache)比使用简单的ConcurrentDictionary<K,V>. 您将获得常用的功能,例如基于时间的过期,或自动逐出依赖于已过期的其他条目或依赖于可变外部资源(如文件、数据库等)的条目。这些功能对于手动实现来说并不简单。
下面是具有值的 sGetOrAddAsync的自定义扩展方法。它接受一个工厂方法,并确保该方法最多被调用一次。它还确保从字典中删除失败的任务。ConcurrentDictionaryTask<TValue>
/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
Func<TKey, Task<TValue>> valueFactory)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(valueFactory);
Task<TValue> currentTask;
if (source.TryGetValue(key, out currentTask))
return currentTask;
Task<Task<TValue>> newTaskTask = new(() => valueFactory(key));
Task<TValue> newTask = null;
newTask = newTaskTask.Unwrap().ContinueWith(task =>
{
if (!task.IsCompletedSuccessfully)
source.TryRemove(KeyValuePair.Create(key, newTask));
return task;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default).Unwrap();
currentTask = source.GetOrAdd(key, newTask);
if (ReferenceEquals(currentTask, newTask))
newTaskTask.RunSynchronously(TaskScheduler.Default);
return currentTask;
}
Run Code Online (Sandbox Code Playgroud)
该方法是使用Task创建 Cold 的构造函数来实现的Task,只有在字典中成功添加时才会启动该构造函数。否则,如果另一个线程赢得了添加相同密钥的竞赛,则冷任务将被丢弃。与更简单的技术相比,使用此技术的优点Lazy<Task>是,如果valueFactory阻止当前线程,它也不会阻止正在等待相同密钥的其他线程。相同的技术可用于实现一个AsyncLazy<T>或一个AsyncExpiringLazy<T>类。
使用示例:
ConcurrentDictionary<string, Task<JsonDocument>> cache = new();
JsonDocument document = await cache.GetOrAddAsync("https://example.com", async url =>
{
string content = await _httpClient.GetStringAsync(url);
return JsonDocument.Parse(content);
});
Run Code Online (Sandbox Code Playgroud)
使用同步委托重载valueFactory:
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
Func<TKey, TValue> valueFactory)
{
ArgumentNullException.ThrowIfNull(valueFactory);
return source.GetOrAddAsync(key, key => Task.FromResult<TValue>(valueFactory(key)));
}
Run Code Online (Sandbox Code Playgroud)
两个重载都会调用valueFactory当前线程上的委托。如果您出于某种原因更喜欢在 上调用委托ThreadPool,则可以将 替换RunSynchronously为Start。
对于在 .NET 6 之前的 .NET 版本上编译的方法版本GetOrAddAsync,您可以查看此答案的第三个修订版。
优化:缓存Task<TResult>对象有点浪费,因为异步操作完成后,真正需要保留的就是TResult. 一个Task<TResult>对象包含多个内部字段,用于跟踪任务的完成状态、Exception信息等。对于ConcurrentDictionary<TKey, Task<TValue>>包含大量键的对象,所有这些不需要的字段可能会导致大量的内存浪费。减少内存占用的一个相对简单的方法是从 切换Task<TResult>到ValueTask<TResult>. 这个想法是在原始任务ValueTask完成后立即用轻量级结果支持的任务取代繁重的原始任务支持。您可以在这里找到这个想法的实现。它是类型GetOrAddAsync的扩展方法ConcurrentDictionary<TKey, ValueTask<TValue>>。