C#producer/consumer/observer?

Xod*_*rap 6 c# asp.net multithreading event-handling producer-consumer

我有一个生产者/消费者队列,除了有特定类型的对象.因此,不只是任何消费者都可以使用添加的对象.我不想为每种类型创建一个特定的队列,因为有太多.(它有点延伸了生产者/消费者的定义,但我不确定正确的术语是什么.)

是否有EventWaitHandle这样的东西允许带参数的脉冲?例如myHandle.Set(AddedType = "foo").现在我正在使用Monitor.Wait,然后每个消费者都会检查脉冲是否真的是针对他们的,但这似乎毫无意义.

我现在拥有的pseduocode版本:

class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,当其他东西被添加到字典中时,我可能会得到脉冲.我只关心将MyType添加到dict中.有没有办法做到这一点?这不是一个大问题,但是,例如,我现在必须手动处理超时,因为每次锁定都可以在超时内成功,但MyType永远不会添加到dict中timeout.

Bri*_*eon 3

这是个有趣的问题。听起来解决方案的关键是优先级队列的阻塞变体。Java 有 .NET BCL PriorityBlockingQueue,但遗憾的是不存在 .NET BCL 的等效项。然而,一旦有了它,实施起来就很容易了。

class MyWorker 
{
    public string MyType {get; set;}
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork()
    {
        while(true)
        {
            MyInfo value;
            if (data.TryTake(MyType, timeout, out value))
            {
                // OK, do work
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

实现 aPriorityBlockingQueue并不是非常困难。BlockingCollection遵循与利用Add和样式方法相同的模式,Take我想出了以下代码。

public class PriorityBlockingQueue<TKey, TValue>
{
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>();

    public void Add(TKey key, TValue value)
    {
        lock (m_Dictionary)
        {
            m_Dictionary.Add(key, value);
            Monitor.Pulse(m_Dictionary);
        }
    }

    public TValue Take(TKey key)
    {
        TValue value;
        TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value);
        return value;
    }

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value)
    {
        value = default(TValue);
        DateTime initial = DateTime.UtcNow;
        lock (m_Dictionary)
        {
            while (!m_Dictionary.TryGetValue(key, out value))
            {
                if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important!
                TimeSpan span = timeout - (DateTime.UtcNow - initial);
                if (!Monitor.Wait(m_Dictionary, span))
                {
                    return false;
                }
            }
            m_Dictionary.Remove(key);
            return true;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个快速的实施,但也存在一些问题。首先,我根本没有测试过。其次,它使用红黑树(via SortedDictionary)作为底层数据结构。这意味着该TryTake方法的复杂度为 O(log(n))。优先级队列的移除复杂度通常为 O(1)。优先级队列选择的典型数据结构是堆但我发现跳跃列表实际上在实践中更好,原因有几个。这些都不存在于 .NET BCL 中,这就是为什么我使用 aSortedDictionary来代替,尽管它在这种情况下性能较差。

我应该在这里指出,这实际上并不能解决无意义的Wait/Pulse行为。它只是封装在PriorityBlockingQueue类中。但是,至少这肯定会清理代码的核心部分。

看起来您的代码并没有处理每个键的多个对象,但是在添加到字典时使用 aQueue<MyInfo>而不是普通的旧对象可以很容易地添加。MyInfo