基于密钥的异步锁定

Jam*_*uth 17 c# multithreading locking async-await imageprocessor

我试图弄清楚,已经提出了与我的ImageProcessor库的问题在这里添加项目到缓存中,当在那里我得到间歇性文件访问错误.

System.IO.IOException:进程无法访问文件'D:\ home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp',因为它正被另一个进程使用.

我编写了一个类,用于基于散列网址生成的密钥执行异步锁定,但似乎我在实现中遗漏了一些东西.

我的锁类

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

用法 - 在HttpModule中

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}
Run Code Online (Sandbox Code Playgroud)

谁能发现我哪里出错?我担心我误解了一些基本的东西.

该库的全部源代码存储在Github 这里

Ste*_*ary 36

正如其他回答者指出,原来的代码去除SemaphoreSlimConcurrentDictionary它释放的信号之前.所以,你有太多的信号量流失 - 当它们仍在使用时它们被从字典中删除(没有获得,但已经从字典中检索到).

这种"映射锁定"的问题在于,很难知道何时不再需要信号量.一种选择是永远不要丢弃信号量; 这是简单的解决方案,但在您的方案中可能无法接受.另一个选择 - 如果信号量实际上与对象实例相关而不是值(如字符串) - 是使用ephemerons附加它们; 但是,我相信这个选项在你的场景中也是不可接受的.

所以,我们这样做很难.:)

有几种不同的方法可行.我认为从引用计数的角度来看它是有意义的(引用计算字典中的每个信号量).另外,我们希望将减量计数和删除操作设为原子,所以我只使用一个lock(使并发字典变得多余):

public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 这太棒了!一种优雅的方法,我在尝试时会永远过度设计。我得到了一个从未处理过的实现,但对不断增加的内存使用量真的很不满意。非常感谢! (3认同)
  • Ephemerons是一种动态语言概念,它将一个对象与另一个对象的生命联系起来.就像您可以添加到`ExpandoObject`的属性一样,但是ephemerons可以附加到任何对象(在这方面更像是JavaScript属性).唯一的.NET ephemeron是`ConditionalWeakTable`,一个难以使用的对象.我写了一个名为[ConnectedProperties]的简单包装库(https://www.nuget.org/packages/Nito.ConnectedProperties/4.0.0-alpha-1). (2认同)
  • @太:我不会。锁定所花费的时间非常短,因此即使大多数用途是读取,也可能不值得更改。 (2认同)
  • @kor_:使用专用对象锁定从来不是一个坏主意。在这种特定情况下,我可以锁定字典实例,因为它是私有的并且从不公开。由于这里的代码是唯一可以访问字典实例的代码,因此我知道没有其他代码可以锁定它。 (2认同)
  • 基于此解决方案,我创建了一个 .NET Standard 2.0 库,可在 NuGet https://www.nuget.org/packages/AsyncKeyedLock 和 GitHub https://github.com/MarkCiliaVincenti/AsyncKeyedLock 上使用 (2认同)

The*_*ias 6

您的实现中的问题源于您希望从字典中删除未使用的储物柜。SemaphoreSlim如果您可以让每个内容永远保留在字典中(直到进程终止),那就简单多了。假设这不是一个可行的选择,您需要克服两个障碍:

\n
    \n
  1. 如何跟踪有多少工作人员正在使用每个信号量,以便您知道何时可以安全地删除它。
  2. \n
  3. 如何使用高性能但棘手的ConcurrentDictionary<K,V>集合来执行上述操作。
  4. \n
\n

Stephen Cleary 的回答展示了如何使用普通的Dictionary<K,V>. 引用计数器与每个 一起存储,并且所有内容都与单个 Locker 对象上的语句SemaphoreSlim同步。lock在这个答案中,我将展示如何解决第二个问题。

\n

集合的问题ConcurrentDictionary<K,V>在于它仅保护其内部状态免受损坏,而不是其包含的值。因此,如果您使用可变类作为,则为微妙的竞争条件TValue打开了大门,特别是如果您打算将这些值缓存在池中并重用它们。消除竞争条件的技巧是使结构成为不可变的。这样它本质上就成为字典内部状态的一部分,并受到字典的保护。在下面的实现中,是 a ,为了性能\xc2\xb9 和方便也声明为 a :TValueAsyncDuplicateLockTValuereadonly structrecord

\n
public class AsyncDuplicateLock\n{\n    private readonly ConcurrentDictionary<object, Entry> _semaphores = new();\n\n    private readonly record struct Entry(SemaphoreSlim Semaphore, int RefCount);\n\n    public readonly struct Releaser : IDisposable\n    {\n        private readonly AsyncDuplicateLock _parent;\n        private readonly object _key;\n        public Releaser(AsyncDuplicateLock parent, object key)\n        {\n            _parent = parent; _key = key;\n        }\n        public void Dispose() => _parent.Release(_key);\n    }\n\n    public async ValueTask<Releaser> LockAsync(object key)\n    {\n        Entry entry = _semaphores.AddOrUpdate(key,\n            static _ => new Entry(new SemaphoreSlim(1, 1), 1),\n            static (_, entry) => entry with { RefCount = entry.RefCount + 1 });\n\n        await entry.Semaphore.WaitAsync().ConfigureAwait(false);\n        return new Releaser(this, key);\n    }\n\n    private void Release(object key)\n    {\n        Entry entry;\n        while (true)\n        {\n            bool exists = _semaphores.TryGetValue(key, out entry);\n            if (!exists)\n                throw new InvalidOperationException("Key not found.");\n            if (entry.RefCount > 1)\n            {\n                Entry newEntry = entry with { RefCount = entry.RefCount - 1 };\n                if (_semaphores.TryUpdate(key, newEntry, entry))\n                    break;\n            }\n            else\n            {\n                if (_semaphores.TryRemove(KeyValuePair.Create(key, entry)))\n                    break;\n            }\n        }\n        entry.Semaphore.Release();\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

请注意,增加和减少RefCount涉及循环旋转while。这是因为当前线程可能会失去与其他线程更新字典的乐观竞争,在这种情况下,它会再次尝试直到成功。旋转在Release方法中是明显的,但也在方法内部发生LockAsync。该方法在内部围绕委托的调用AddOrUpdate采用类似的逻辑。updateValueFactory

\n

性能:在严重争用的情况下,上述实现比基于简单的Dictionary<K,V>实现快约 80%。这是因为ConcurrentDictionary<K,V>内部使用了多个 Locker 对象,因此想要锁定密钥的线程"A"不必等到另一个线程完成获取或释放密钥"B"。不过分配的数量要多得多。如果您有某种理由让垃圾收集器保持放松,那么Dictionary<K,V>基于 - 的实现将为您提供更好的服务。如果你既想要终极速度又想要终极内存效率,你可以看看这个答案的第六版,基于多个Dictionary<K,V>s 的实现。

\n

异常:SemaphoreSlim类被误用时,它会抛出一个SemaphoreFullException. 当信号量的释放次数多于获取次数时,就会发生这种情况。AsyncDuplicateLock在误用的情况下,该答案的实现行为会有所不同:它会抛出一个InvalidOperationException("Key not found."). 发生这种情况是因为当释放某个键的次数与获取该键的次数相同时,关联的信号量将从字典中删除。如果此实现抛出SemaphoreFullException,则表明存在错误。

\n

注意:就我个人而言,我不喜欢(错误地)将该using语句用于释放非托管资源以外的目的。

\n

\xc2\xb9使用 .在许多操作(等)中比较ConcurrentDictionary<K,V>s 。默认情况下,结构不会被有效比较,除非它们实现了接口。记录结构确实以与值元组类似的方式实现了此接口,因此可以有效地比较它们的相等性。实际上,使用值元组作为( ) 可能会稍微更有效,因为值元组的成员是字段,而记录结构的成员是属性。不过记录结构更方便。TValueAddOrUpdateTryUpdateTryRemoveEqualityComparer<TValue>.DefaultIEquatable<T>TValue(SemaphoreSlim, int)

\n

  • @DeivydasVoroneckis 问题本身有一个例子(在标题“用法 - 在 HttpModule 内”下)。此答案中“AsyncDuplicateLock”的 API 与问题中“AsyncDuplicateLock”的 API 几乎相同。 (2认同)