Jam*_*uth 17 c# multithreading locking async-await imageprocessor
我试图弄清楚,已经提出了与我的ImageProcessor库的问题在这里添加项目到缓存中,当在那里我得到间歇性文件访问错误.
System.IO.IOException:进程无法访问文件'D:\ home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp',因为它正被另一个进程使用.
我编写了一个类,用于基于散列网址生成的密钥执行异步锁定,但似乎我在实现中遗漏了一些东西.
我的锁类
public sealed class AsyncDuplicateLock
{
/// <summary>
/// The collection of semaphore slims.
/// </summary>
private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
= new ConcurrentDictionary<object, SemaphoreSlim>();
/// <summary>
/// Locks against the given key.
/// </summary>
/// <param name="key">
/// The key that identifies the current object.
/// </param>
/// <returns>
/// The disposable <see cref="Task"/>.
/// </returns>
public IDisposable Lock(object key)
{
DisposableScope releaser = new DisposableScope(
key,
s =>
{
SemaphoreSlim locker;
if (SemaphoreSlims.TryRemove(s, out locker))
{
locker.Release();
locker.Dispose();
}
});
SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
semaphore.Wait();
return releaser;
}
/// <summary>
/// Asynchronously locks against the given key.
/// </summary>
/// <param name="key">
/// The key that identifies the current object.
/// </param>
/// <returns>
/// The disposable <see cref="Task"/>.
/// </returns>
public Task<IDisposable> LockAsync(object key)
{
DisposableScope releaser = new DisposableScope(
key,
s =>
{
SemaphoreSlim locker;
if (SemaphoreSlims.TryRemove(s, out locker))
{
locker.Release();
locker.Dispose();
}
});
Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
Task waitTask = semaphore.WaitAsync();
return waitTask.IsCompleted
? releaserTask
: waitTask.ContinueWith(
(_, r) => (IDisposable)r,
releaser,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
/// <summary>
/// The disposable scope.
/// </summary>
private sealed class DisposableScope : IDisposable
{
/// <summary>
/// The key
/// </summary>
private readonly object key;
/// <summary>
/// The close scope action.
/// </summary>
private readonly Action<object> closeScopeAction;
/// <summary>
/// Initializes a new instance of the <see cref="DisposableScope"/> class.
/// </summary>
/// <param name="key">
/// The key.
/// </param>
/// <param name="closeScopeAction">
/// The close scope action.
/// </param>
public DisposableScope(object key, Action<object> closeScopeAction)
{
this.key = key;
this.closeScopeAction = closeScopeAction;
}
/// <summary>
/// Disposes the scope.
/// </summary>
public void Dispose()
{
this.closeScopeAction(this.key);
}
}
}
Run Code Online (Sandbox Code Playgroud)
用法 - 在HttpModule中
private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();
using (await this.locker.LockAsync(cachedPath))
{
// Process and save a cached image.
}
Run Code Online (Sandbox Code Playgroud)
谁能发现我哪里出错?我担心我误解了一些基本的东西.
该库的全部源代码存储在Github 这里
Ste*_*ary 36
正如其他回答者指出,原来的代码去除SemaphoreSlim从ConcurrentDictionary它释放的信号之前.所以,你有太多的信号量流失 - 当它们仍在使用时它们被从字典中删除(没有获得,但已经从字典中检索到).
这种"映射锁定"的问题在于,很难知道何时不再需要信号量.一种选择是永远不要丢弃信号量; 这是简单的解决方案,但在您的方案中可能无法接受.另一个选择 - 如果信号量实际上与对象实例相关而不是值(如字符串) - 是使用ephemerons附加它们; 但是,我相信这个选项在你的场景中也是不可接受的.
所以,我们这样做很难.:)
有几种不同的方法可行.我认为从引用计数的角度来看它是有意义的(引用计算字典中的每个信号量).另外,我们希望将减量计数和删除操作设为原子,所以我只使用一个lock(使并发字典变得多余):
public sealed class AsyncDuplicateLock
{
private sealed class RefCounted<T>
{
public RefCounted(T value)
{
RefCount = 1;
Value = value;
}
public int RefCount { get; set; }
public T Value { get; private set; }
}
private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
= new Dictionary<object, RefCounted<SemaphoreSlim>>();
private SemaphoreSlim GetOrCreate(object key)
{
RefCounted<SemaphoreSlim> item;
lock (SemaphoreSlims)
{
if (SemaphoreSlims.TryGetValue(key, out item))
{
++item.RefCount;
}
else
{
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
SemaphoreSlims[key] = item;
}
}
return item.Value;
}
public IDisposable Lock(object key)
{
GetOrCreate(key).Wait();
return new Releaser { Key = key };
}
public async Task<IDisposable> LockAsync(object key)
{
await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
return new Releaser { Key = key };
}
private sealed class Releaser : IDisposable
{
public object Key { get; set; }
public void Dispose()
{
RefCounted<SemaphoreSlim> item;
lock (SemaphoreSlims)
{
item = SemaphoreSlims[Key];
--item.RefCount;
if (item.RefCount == 0)
SemaphoreSlims.Remove(Key);
}
item.Value.Release();
}
}
}
Run Code Online (Sandbox Code Playgroud)
您的实现中的问题源于您希望从字典中删除未使用的储物柜。SemaphoreSlim如果您可以让每个内容永远保留在字典中(直到进程终止),那就简单多了。假设这不是一个可行的选择,您需要克服两个障碍:
ConcurrentDictionary<K,V>集合来执行上述操作。Stephen Cleary 的回答展示了如何使用普通的Dictionary<K,V>. 引用计数器与每个 一起存储,并且所有内容都与单个 Locker 对象上的语句SemaphoreSlim同步。lock在这个答案中,我将展示如何解决第二个问题。
集合的问题ConcurrentDictionary<K,V>在于它仅保护其内部状态免受损坏,而不是其包含的值。因此,如果您使用可变类作为,则为微妙的竞争条件TValue打开了大门,特别是如果您打算将这些值缓存在池中并重用它们。消除竞争条件的技巧是使结构成为不可变的。这样它本质上就成为字典内部状态的一部分,并受到字典的保护。在下面的实现中,是 a ,为了性能\xc2\xb9 和方便也声明为 a :TValueAsyncDuplicateLockTValuereadonly structrecord
public class AsyncDuplicateLock\n{\n private readonly ConcurrentDictionary<object, Entry> _semaphores = new();\n\n private readonly record struct Entry(SemaphoreSlim Semaphore, int RefCount);\n\n public readonly struct Releaser : IDisposable\n {\n private readonly AsyncDuplicateLock _parent;\n private readonly object _key;\n public Releaser(AsyncDuplicateLock parent, object key)\n {\n _parent = parent; _key = key;\n }\n public void Dispose() => _parent.Release(_key);\n }\n\n public async ValueTask<Releaser> LockAsync(object key)\n {\n Entry entry = _semaphores.AddOrUpdate(key,\n static _ => new Entry(new SemaphoreSlim(1, 1), 1),\n static (_, entry) => entry with { RefCount = entry.RefCount + 1 });\n\n await entry.Semaphore.WaitAsync().ConfigureAwait(false);\n return new Releaser(this, key);\n }\n\n private void Release(object key)\n {\n Entry entry;\n while (true)\n {\n bool exists = _semaphores.TryGetValue(key, out entry);\n if (!exists)\n throw new InvalidOperationException("Key not found.");\n if (entry.RefCount > 1)\n {\n Entry newEntry = entry with { RefCount = entry.RefCount - 1 };\n if (_semaphores.TryUpdate(key, newEntry, entry))\n break;\n }\n else\n {\n if (_semaphores.TryRemove(KeyValuePair.Create(key, entry)))\n break;\n }\n }\n entry.Semaphore.Release();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n请注意,增加和减少RefCount涉及循环旋转while。这是因为当前线程可能会失去与其他线程更新字典的乐观竞争,在这种情况下,它会再次尝试直到成功。旋转在Release方法中是明显的,但也在方法内部发生LockAsync。该方法在内部围绕委托的调用AddOrUpdate采用类似的逻辑。updateValueFactory
性能:在严重争用的情况下,上述实现比基于简单的Dictionary<K,V>实现快约 80%。这是因为ConcurrentDictionary<K,V>内部使用了多个 Locker 对象,因此想要锁定密钥的线程"A"不必等到另一个线程完成获取或释放密钥"B"。不过分配的数量要多得多。如果您有某种理由让垃圾收集器保持放松,那么Dictionary<K,V>基于 - 的实现将为您提供更好的服务。如果你既想要终极速度又想要终极内存效率,你可以看看这个答案的第六版,基于多个Dictionary<K,V>s 的实现。
异常:当SemaphoreSlim类被误用时,它会抛出一个SemaphoreFullException. 当信号量的释放次数多于获取次数时,就会发生这种情况。AsyncDuplicateLock在误用的情况下,该答案的实现行为会有所不同:它会抛出一个InvalidOperationException("Key not found."). 发生这种情况是因为当释放某个键的次数与获取该键的次数相同时,关联的信号量将从字典中删除。如果此实现抛出SemaphoreFullException,则表明存在错误。
注意:就我个人而言,我不喜欢(错误地)将该using语句用于释放非托管资源以外的目的。
\xc2\xb9使用 .在许多操作(、等)中比较ConcurrentDictionary<K,V>s 。默认情况下,结构不会被有效比较,除非它们实现了接口。记录结构确实以与值元组类似的方式实现了此接口,因此可以有效地比较它们的相等性。实际上,使用值元组作为( ) 可能会稍微更有效,因为值元组的成员是字段,而记录结构的成员是属性。不过记录结构更方便。TValueAddOrUpdateTryUpdateTryRemoveEqualityComparer<TValue>.DefaultIEquatable<T>TValue(SemaphoreSlim, int)
| 归档时间: |
|
| 查看次数: |
4809 次 |
| 最近记录: |