我有一个生产者通过爆发产生整数(在几秒钟内产生1到50).我有一个消费者按块使用这些整数.
我希望消费者在制作人完成他的爆发时开始消费(我没有制作人的领先优势,我只知道它已经完成生产,当没有任何产生5秒).
我想到了两种不同的方式:
第一:使用一种不同于另一种消费者的消费者:
private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);
//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
_mainWaiter.Set();
_secondaryWaiter.Set();
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
while(true)
{
_mainWaiter.Wait();
while(_secondaryWaiter.Wait(5000));
List<int> localIds;
lock(_ids)
{
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
}
Run Code Online (Sandbox Code Playgroud)
第二:为最后一次更新提供一种令牌
//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{
Thread.Sleep(5000);
List<int> localIds;
lock(_ids)
{
if(myToken !=_lastToken)
return;
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
Run Code Online (Sandbox Code Playgroud)
但我发现这些方法有点太复杂了.是否存在原生/更简单的解决方案?你会怎么做?
如果您使用已经具有通知等的线程安全队列,这将变得更加容易.该BlockingCollection使得编写生产者-消费者的东西真的很容易.
我喜欢你的"链接消费者"的想法,因为你不必修改生产者才能使用它.也就是说,生产者只是把东西塞进队列中.消费者最终如何使用它是无关紧要的.BlockingCollection然后,使用,您将拥有:
BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();
Run Code Online (Sandbox Code Playgroud)
您的生产者通过调用将内容添加到输入队列inputQueue.Add.您的中间消费者(称之为整合者)通过调用TryTake超时来从队列中获取内容.例如:
List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
ItemType t;
while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
{
items.Add(t);
}
if (items.Count > 0)
{
// Add this list of items to the intermediate queue
intermediateQueue.Add(items);
items = new List<ItemType>();
}
}
Run Code Online (Sandbox Code Playgroud)
第二个使用者只读取中间队列:
foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
// do something with the items list
}
Run Code Online (Sandbox Code Playgroud)
不需要ManualResetEvent或不需要lock; BlockingCollection为你处理所有混乱的并发事物.
小智 1
为了扩展@Chris 的想法,当你使用一个 id 时,请记住现在是什么时间。如果自上一个列表以来已经过去了超过 5 秒,则启动一个新列表并设置一个事件。您的块消费者只需等待该事件并使用存储的列表。
另请注意,在您的第一个解决方案中,ConsumerIdByBlock 可能会在 OnConsumeId 获取锁之前退出内部,然后 ConsumerIdByBlock 会消耗至少一个 Id 太多。
| 归档时间: |
|
| 查看次数: |
407 次 |
| 最近记录: |