我正在使用(坦率的)很棒的BlockingCollection<>
类型,用于多线程,高性能的应用程序.
通过集合有很多吞吐量,在微观层面上它具有很高的性能.但是,对于每个"批次",它将始终通过标记取消令牌来结束.这导致在任何等待Take
调用时抛出异常.这没关系,但是我会确定一个返回值或输出参数来表示它,因为a)异常有明显的开销,而b)在调试时,我不想手动关闭特定的break-on-exception例外.
实现似乎很激烈,理论上我认为我可以反汇编并重新创建我自己的版本,但没有使用异常,但也许有一种不太复杂的方式?
我可以null
在集合中添加(或者如果不是,占位符)对象以表示进程应该结束,但是还需要有一种很好地中止的方法,即唤醒等待的线程并告诉他们某些事情已经发生了.
那么 - 替代收藏类型?重新创建自己的?有人滥用这个吗?
(某些上下文:BlockingCollection<>
因为它优于手动锁定,因此它具有优势Queue
.最好我可以告诉使用线程原语是极好的,在我的情况下,这里和那里几毫秒,最佳核心使用至关重要. )
编辑:我刚开了这个奖金. 我不相信Anastasiosyal的答案涵盖了我在评论中提出的问题.我知道这是一个棘手的问题.有人能协助吗?
我使用任务并行库,可量化和可重复的问题BlockingCollection<T>
,ConcurrentQueue<T>
与GetConsumingEnumerable
试图创建一个简单的管道.
简而言之,从一个线程向默认值BlockingCollection<T>
(引擎盖下依赖于a ConcurrentQueue<T>
)添加条目并不能保证它们会BlockingCollection<T>
从调用GetConsumingEnumerable()
Method的另一个线程中弹出.
我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上.
Timer1
负责排队工作项...它使用一个被调用的并发字典,_tracker
以便它知道它已经添加到阻塞集合中的内容.Timer2
只记录两个BlockingCollection
&的计数状态_tracker
Paralell.ForEach
简单地遍历阻塞集合GetConsumingEnumerable()
并开始将它们打印到第二个列表框的按钮.Timer1
阻止将更多条目添加到阻止集合中.public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent(); …
Run Code Online (Sandbox Code Playgroud) 我在Albahari的一本Nutshell书中重用了C#中的示例生产者消费者队列(http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT),一位同事评论道:"为什么不调用Dispose关于收集处理中的BlockingCollection?"
我找不到答案,我能想出的唯一理由是不会处理队列剩余工作量的执行.但是,当我处理队列时,为什么不停止处理呢?
除了"为什么你不应该处理BlockingCollection?" 我还有第二个问题"如果你不处理BlockingCollection,会不会受到伤害?".我想当你产生/处理大量的生产者消费者队列时,它会产生问题(不是我想要的,而只是为了知道的原因).
根据BlockingCollection.Dispose实际做了什么?BlockingCollection包含两个等待句柄(显然),所以不调用Dispose会给你一些问题.谢谢ken2k指出这一点.
我正在谈论的代码:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available …
Run Code Online (Sandbox Code Playgroud) 在下面的代码中,我使用CancellationToken在生产者没有生成时唤醒GetConsumingEnumerable(),我想要脱离foreach并退出Task.但我没有看到IsCancellationRequested被记录,我的Task.Wait(timeOut)等待整个timeOut期间.我究竟做错了什么?
userToken.Task = Task.Factory.StartNew(state =>
{
userToken.CancelToken = new CancellationTokenSource();
foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
{
if (userToken.CancelToken.IsCancellationRequested)
{
Log.Write("BroadcastQueue IsCancellationRequested");
break;
...
}
}
return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
Run Code Online (Sandbox Code Playgroud)
后来...
UserToken.CancelToken.Cancel();
try
{
task.Wait(timeOut);
}
catch (AggregateException ar)
{
Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
Run Code Online (Sandbox Code Playgroud) 在C#中,我想知道是否可以等到后台线程清除BlockingCollection,如果时间太长则超时.
我现在拥有的临时代码让我觉得有些不雅(因为什么时候使用它是好的做法Thread.Sleep
?):
while (_blockingCollection.Count > 0 || !_blockingCollection.IsAddingCompleted)
{
Thread.Sleep(TimeSpan.FromMilliseconds(20));
// [extra code to break if it takes too long]
}
Run Code Online (Sandbox Code Playgroud) 我编写了以下方法来批处理一个巨大的CSV文件.我们的想法是从文件中读取一大块行到内存中,然后将这些行分成固定大小的批量.获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches …
Run Code Online (Sandbox Code Playgroud) c# csv task-parallel-library blockingcollection tpl-dataflow
我有一个递归问题,消费者在树的每个级别做一些工作,然后需要递归树并在下一级执行相同的工作.
我想使用ConcurrentBag/BlockingCollection等并行运行它.在这种情况下,队列的使用者也是队列的生产者!
我的问题是这样的:使用BlockingCollection,我可以编写非常简单的foreach逻辑来对项目进行队列化,并对新项目进行排队 - 当队列为空时,阻塞集合将正确阻塞,并等待其中一个新工作生成消费者.
但我如何知道所有消费者是否都在阻止?!
我知道CompleteAdding(),但这似乎没有服务,因为你实际完成的唯一时间是所有的生产者都完成生产并且队列是空的 - 因为它们都会阻塞,所以没有人"免费"设置CompleteAdding().有没有办法检测到这个?(也许是一个可以在阻挡时触发的事件,并在解锁时再次触发?)
我可以手动处理,不使用foreach,但手动有一段时间(!完整)循环,并使用TryTake,但后来我需要手动睡眠,这似乎是无效的(完全的原因是阻止集合vs只是首先是并发集合!)每次循环,如果TryTake为假,我可以设置一个空闲标志,然后有一个Master检查队列是否为空,并且所有线程都空闲,设置一个完整的标志,但同样,这似乎很糟糕.
直觉告诉我有一些方法可以使用Blocking Collection来做到这一点,但我无法达到目的.
无论如何,任何人都有一个良好的模式,当消费者是生产者,并能够检测何时释放所有块将是真棒
我有一个生产者线程和多个消费者线程的以下代码.你知道多个消费者是否是线程安全的.例如,线程1是否正在消耗,而线程2并行消耗并更改线程1中使用的项的值?
namespace BlockingColl
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
try
{
for (int i = 0; i < 3; i++)
{
ThreadPool.QueueUserWorkItem((x) =>
{
foreach (var item in bc.GetConsumingEnumerable())
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - " + item + " - " + DateTime.Now.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));
}
});
}
}
catch (Exception)
{
throw;
}
}
private void button2_Click(object sender, EventArgs e)
{
for (int i = …
Run Code Online (Sandbox Code Playgroud) 我有一个流程生成工作,第二个流程使用BlockingCollection<>
这个工作.当我关闭我的程序时,我需要我的消费者停止消费工作,但我仍然需要快速记录待处理但尚未消耗的工作.
现在,我的消费者产生了一个有foreach (<object> in BlockingCollection.GetConsumingEnumerable())
循环的线程.当我停止我的程序时,我的制作人打电话Consumer.BlockingCollection.CompleteAdding()
.我发现我的消费者继续处理队列中的所有内容.
谷歌搜索问题告诉我,我需要使用CancellationToken
.所以我试了一下:
private void Process () { // This method runs in a separate thread
try {
foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
// Consume
}
}
catch (OperationCancelledException) {
foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
// quickly log
}
}
}
Run Code Online (Sandbox Code Playgroud)
我的制作人有:
private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();
Run Code Online (Sandbox Code Playgroud)
当我尝试这个时,我没有得到OperationCancelledException发生的迹象.
这个问题试图解释使用取消令牌,但似乎它没有正确使用它.(论证:如果它有效,那么它"足够正确".) …
c# ×9
.net ×3
consumer ×2
cancellation ×1
concurrency ×1
csv ×1
dispose ×1
java ×1
producer ×1
tpl-dataflow ×1
wpf ×1