并发优先收集

Dmi*_*nov 1 c# collections concurrency multithreading producer-consumer

我有许多线程从服务器列表中检索数据。每 5 分钟从服务器解析器下载一次服务器列表。我用于处理数据的线程应该仅使用响应时间最短的服务器。每个服务器的响应时间可能因请求而异。因此,在更新服务器列表之间的时间范围内,我应该验证每个服务器的响应时间。

我最初的方法是创建两个额外的线程:第一个线程更新服务器列表,第二个线程验证每个服务器的响应时间并根据服务器的响应时间对服务器列表进行排序。

我尝试使用BlockingCollection<T>它来连接生产者和消费者,但在我的任务中,我有两个并发消费者,并且也BlockingCollection<T>没有插入项目来创建服务器优先列表的本机能力。

ConcurrentStack<T>或者ConcurrentQueue<T>也不能按原样使用,因为它们像 as 一样是非阻塞的,BlockingCollection<T>并且它们需要额外的机制来阻塞需要队列中的项目的线程。

请帮我解决这个任务。

The*_*ias 5

随着 .NET 6 的出现,一个新的类PriorityQueue<TElement, TPriority>已经可用。这不是一个线程安全的集合,但尽管如此,它可以很容易地用作实现的后备存储IProducerConsumerCollection<T>,而实现又可以成为类的底层数据存储BlockingCollection<T>。下面是这样的实现,包含完成这项工作所需的最少逻辑:

public class ProducerConsumerPriorityQueue<TElement, TPriority>
    : IProducerConsumerCollection<(TElement, TPriority)>
{
    private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
    private long _index = 0;

    public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
    {
        comparer ??= Comparer<TPriority>.Default;
        _queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
        {
            int result = comparer.Compare(x.Item1, y.Item1);
            if (result == 0) result = x.Item2.CompareTo(y.Item2);
            return result;
        }));
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryAdd((TElement, TPriority) item)
    {
        lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
        return true;
    }

    public bool TryTake(out (TElement, TPriority) item)
    {
        lock (_queue)
        {
            if (_queue.TryDequeue(out var element, out var priority))
            {
                item = (element, priority.Item1); return true;
            }
            item = default; return false;
        }
    }

    public bool IsSynchronized => false;

    public object SyncRoot => throw new NotSupportedException();

    public (TElement, TPriority)[] ToArray()
        => throw new NotSupportedException();

    public void CopyTo((TElement, TPriority)[] array, int index)
        => throw new NotSupportedException();

    public void CopyTo(Array array, int index)
        => throw new NotSupportedException();

    public IEnumerator<(TElement, TPriority)> GetEnumerator()
        => throw new NotSupportedException();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Run Code Online (Sandbox Code Playgroud)

只有成员CountTryAddTryTake被实现,但它们就足够了。它们以线程安全的方式实现,因为类的文档BlockingCollection<T>要求这样做:

IProducerConsumerCollection<T>表示允许线程安全地添加和删除数据的集合。

使用示例:

var collection = new BlockingCollection<(string Server, int Priority)>(
    new ProducerConsumerPriorityQueue<string, int>());

collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();

foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
    Console.WriteLine($"Server: {server}, Priority: {priority}");
}
Run Code Online (Sandbox Code Playgroud)

输出:

public class ProducerConsumerPriorityQueue<TElement, TPriority>
    : IProducerConsumerCollection<(TElement, TPriority)>
{
    private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
    private long _index = 0;

    public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
    {
        comparer ??= Comparer<TPriority>.Default;
        _queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
        {
            int result = comparer.Compare(x.Item1, y.Item1);
            if (result == 0) result = x.Item2.CompareTo(y.Item2);
            return result;
        }));
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryAdd((TElement, TPriority) item)
    {
        lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
        return true;
    }

    public bool TryTake(out (TElement, TPriority) item)
    {
        lock (_queue)
        {
            if (_queue.TryDequeue(out var element, out var priority))
            {
                item = (element, priority.Item1); return true;
            }
            item = default; return false;
        }
    }

    public bool IsSynchronized => false;

    public object SyncRoot => throw new NotSupportedException();

    public (TElement, TPriority)[] ToArray()
        => throw new NotSupportedException();

    public void CopyTo((TElement, TPriority)[] array, int index)
        => throw new NotSupportedException();

    public void CopyTo(Array array, int index)
        => throw new NotSupportedException();

    public IEnumerator<(TElement, TPriority)> GetEnumerator()
        => throw new NotSupportedException();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Run Code Online (Sandbox Code Playgroud)

在线演示

保留具有相同优先级的项目的插入顺序。