.Net中的键控锁定

Cam*_*ron 2 .net c# azure azureservicebus

我有一个Azure Service Bus队列,我收到1到10条带有相同"密钥"的消息.其中一条消息需要使用长时间运行的操作进行处理.完成后,数据库将被更新,其他消息将检查它.但是,与此同时,其他消息将被重新排队,以便不会丢失该过程.

但重点是这个长时间运行的操作不能同时运行同一个键,不应该多次运行.

这是我到目前为止所得到的:

void Main()
{
    Enumerable.Range(1, 1000)
              .AsParallel()
              .ForAll(async i => await ManageConcurrency(i % 2, async () => await Task.Delay(TimeSpan.FromSeconds(10)))); 
}

private readonly ConcurrentDictionary<int, SemaphoreSlim> _taskLocks = new ConcurrentDictionary<int, SemaphoreSlim>();

private async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
    SemaphoreSlim taskLock = null;

    try
    {
        if (_taskLocks.TryGetValue(taskId, out taskLock))
        {
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found. No available.. Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                return false;
            }

            taskLock.Wait();

            Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
        }
        else
        {
            taskLock = new SemaphoreSlim(1, 1);
            taskLock = _taskLocks.GetOrAdd(taskId, taskLock);
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, and then found/created. None available.. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
                return false;
            }
            else
            {
                taskLock.Wait(TimeSpan.FromSeconds(1));

                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, then found/created, and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
            }
        }

        Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");

        await task.Invoke();

        return true;
    }
    catch (Exception e)
    {
        ;
        return false;
    }
    finally
    {
        //taskLock?.Release();

        _taskLocks.TryRemove(taskId, out taskLock);

        //Console.WriteLine($"I removed. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
    }
}
Run Code Online (Sandbox Code Playgroud)

它没有按预期工作,因为它将创建多个信号量,突然我的长时间运行操作使用相同的键运行两次.我认为问题是因为整个操作不是原子的.

解决这个问题的最佳方法是什么?

usr*_*usr 5

您正确认识到需要确保每个密钥只创建一个信号量.标准的习惯用法是:

var dict = new ConcurrentDictionary<TKey, Lazy<SemaphoreSlim>>();
...
var sem = dict.GetOrAdd( , _ => new new Lazy<SemaphoreSlim>(() => SemaphoreSlim(1, 1))).Value;
Run Code Online (Sandbox Code Playgroud)

可能会创建多个lazies,但只有其中一个会被揭示和具体化.

除此之外,依靠内存状态是一个值得怀疑的做法.如果您的队列处理应用程序回收并且所有信号量都丢失了怎么办?您最好使用持久性存储来跟踪此锁定信息.