我有一个BlockingCollection(ConcurrentBag,50000),我试图为生产者线程使用一个非常小的有限容量50,000,以便最大化我可以在我的消费者线程的ConcurrentDictionary中处理的记录数.生产者比消费者快得多,并且会消耗大部分内存.
不幸的是,我立即注意到我的ConcurrentDictionary中的记录总数现在大大低于在我的测试数据执行时添加50,000的有界容量后的记录.我读到BlockingCollection的.add方法应该无限期地阻塞,直到集合中有空间来执行add.但是,情况似乎并非如此.
问题:
如果在BlockingCollection中的容量释放之前调用了太多的add,那么BlockingCollection的.add方法最终会超时或者无声地失败吗?
如果#1的答案是肯定的,那么在超过限制容量而不丢失数据后我可以尝试多少次添加?
如果调用了许多正在等待/阻塞容量的BlockingCollection .add()方法并且调用了CompleteAdding()方法,那些等待/阻塞添加会继续等待然后最终添加还是静默失败?
c# parallel-processing multithreading concurrentdictionary blockingcollection
有没有人知道,当迭代C#BlockingCollection <>时,是否从集合中获取元素,就像BlockingCollection.Take()所做的那样?
BlockingCollection<int> q = new BlockingCollection<int>();
[...]
foreach(int i in q)
{
//does q still contain i?
}
Run Code Online (Sandbox Code Playgroud)
谢谢
编辑:当然我的意思是BlockingCollection,但由于某种原因,我的头脑中有了BlockingQueue并使用了它.
我正在使用 ConcurrentDictionary 来存储日志行,当我需要向用户显示它们时,我会调用ToList()以生成一个列表。但奇怪的是,有些用户在列表中最先收到最近的行,而从逻辑上讲,他们应该在最后。
这是因为 ConcurrentDictionary 不能保证 IEnumerate 接口上的持久顺序,或者是什么原因?
.net concurrency list concurrentdictionary blockingcollection
我遇到了多线程的绊脚石.我想我知道问题是什么,但无法确定如何解决它.但我可能错了.
总之,我有生产者和消费者线程.生产者线程将来自外部源的数据收集到数据表中,然后将它们放入集合中.然后,消费者从集合中获取数据表.我使用BlockingCollection作为公共静态集合,因此两个线程都可以访问它,它存在于两个不同的类中.我现在将展示代码的主要部分,然后解释什么是和不起作用.
制片人主题:
try
{
dataTable.Clear();
adapter.Fill(dataTable);
dataCaptured = true;
timeout = 0;
ThreadInfo.setCurrentDate(startDate);
ThreadInfo.dataTableCollection.Add(dataTable);
}
Run Code Online (Sandbox Code Playgroud)
消费者线程
while(true)
{
DataTable testTable = ThreadInfo.dataTableCollection.Take();
foreach (DataRow datarow in testTable.Rows)
{
foreach (var item in datarow.ItemArray)
{
Console.WriteLine(item);
}
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的测试显示,当生产者线程创建数据表时,它成功地将它们添加到集合中.我可以通过在add方法之前和之后使用count来看到这一点.计算每个表中的行数我还可以确认添加的表是与创建的表相同.此外,take方法还成功删除了一个表,该表与输入的表匹配.我知道这一点既可以计算集合中的表数,也可以计算'take'数据表中的行数.
我的问题是当我尝试运行foreach循环以打印出结果时.最初它工作并开始将数据打印到屏幕,但随后抛出此错误:
System.InvalidOperationException was unhandled
HResult=-2146233079
Message=Collection was modified; enumeration operation might not execute.
Source=System.Data
StackTrace:
at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
at pullPlexTable.InputThreads.dataConsumerThread() in \\srv-file01\users$\dkb\Visual Studio 2013\Projects\pullPlexTable\pullPlexTable\InputThread.cs:line 39
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback …Run Code Online (Sandbox Code Playgroud) 我想这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,是否会出现ReceivingThread()orSendingThread()方法中的 while 循环停止执行的情况。请注意,它EnqueueSend(DataSendEnqeueInfo info)是从多个不同线程调用的,我可能无法在这里使用任务,因为我肯定必须在单独的线程中使用命令。
private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;
private void ReceivingThread()
{
while (mIsRunning)
{
mRcWaitHandle.WaitOne();
DataRecievedEnqeueInfo item = null;
while (mReceivingThreadQueue.Count > 0)
{
lock (mReceivingQueueLock)
{
item = mReceivingThreadQueue.Dequeue();
}
ProcessReceivingItem(item);
}
mRcWaitHandle.Reset();
}
}
private void SendingThread()
{
while (mIsRunning)
{
mSeWaitHandle.WaitOne(); …Run Code Online (Sandbox Code Playgroud) c# multithreading producer-consumer blockingcollection tpl-dataflow
我正在运行一个线程
while(isRunning)
{
blockingCollection.Take()
}
Run Code Online (Sandbox Code Playgroud)
首先我设置isRunning为false。然后我调用thread.Interrupt它阻止blockingCollection等待新项目。之后,我Dispose为在 catch 块内运行线程的类调用我的方法。
这是最好的/正确的方法吗?
BlockingCollection仅包含添加单个项目的方法.如果我想添加一个集合怎么办?我应该只使用"foreach"循环吗?
为什么BlockingCollection不包含添加集合的方法?我认为这种方法非常有用.
MSDN上的评论在http://msdn.microsoft.com/en-us/library/dd267312.aspx上说明......
"BlockingCollection的默认集合类型是ConcurrentQueue"
这是否意味着当我在集合上运行"GetConsumingEnumerable()"时,拉出的项目正在从队列中出列,并且在使用后将标记为GC?
换句话说......在下面的代码片段中,
foreach (var item in collection.GetConsumingEnumerable())
{
//do something with item
}
Run Code Online (Sandbox Code Playgroud)
循环迭代后项目会发生什么?
我有以下场景.
我从数据库中将50个作业转换为阻塞集合.
每项工作都是一项长期工作.(可能是).所以我想在一个单独的线程中运行它们.(我知道 - 将它们作为Task.WhenAll运行并让TPL解决它可能会更好 - 但我想控制同时运行多少次)
说我想同时运行其中的5个(可配置)
我创建了5个任务(TPL),每个作业一个并行并行运行.
我想要做的是,一旦第4步中的一个作业完成,就立即在阻塞集合中选择下一个作业,并继续进行直到完成所有50个作业.
我正在考虑创建一个Static blockingCollection和一个TaskCompletionSource,它将在作业完成时调用,然后它可以再次调用使用者从队列中一次选择一个作业.我还想在每项工作上调用async/await - 但这是最重要的 - 不确定这是否对该方法有影响.
这是完成我想要做的事情的正确方法吗?
与此链接类似,但是捕获的是我想在前N个项目中的一个完成后立即处理下一个作业.毕竟N都没有完成.
更新:
好吧,如果有人想在以后使用它,我有这个代码片段正是我想要的.如下所示,创建了5个线程,每个线程在完成当前操作时启动下一个作业.在任何给定时间只有5个线程处于活动状态.我知道这可能不会像这样100%工作,并且如果与一个cpu/core一起使用,将会出现上下文切换的性能问题.
var block = new ActionBlock<Job>(
job => Handler.HandleJob(job),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
foreach (Job j in GetJobs())
block.SendAsync(j);
Run Code Online (Sandbox Code Playgroud)
工作2开始于主题:13.等待时间:3600000ms.时间:2014年8月29日下午3:14:43
工作4开始于主题:14.等待时间:15000ms.时间:2014年8月29日下午3:14:43
作业0在线程上开始:7.等待时间:600000ms.时间:2014年8月29日下午3:14:43
作业1开始于主题:12.等待时间:900000ms.时间:2014年8月29日下午3:14:43
工作3开始于主题:11.等待时间:120000ms.时间:2014年8月29日下午3:14:43
工作4完成了主题:14.2014年8月29日下午3:14:58
作业5开始于主题:14.等待时间:1800000ms.时间:2014年8月29日下午3:14:58
工作3完成线程:11.2014年8月29日下午3:16:43
Job 6从线程开始:11.等待时间:1200000ms.时间:2014年8月29日下午3:16:43
作业0完成了主题:7.2014年8月29日下午3:24:43
Job 7在线程上开始:7.等待时间:30000ms.时间:2014年8月29日下午3:24:43
工作7完成线程:7.2014年8月29日下午3:25:13
Job 8开始于主题:7.等待时间:100000ms.时间:2014年8月29日下午3:25:13
作业8完成了主题:7.2014年8月29日下午3:26:53
Job 9从线程开始:7.等待时间:900000ms.时间:2014年8月29日下午3:26:53
工作1完成线程:12.2014年8月29日下午3:29:43
作业10开始于主题:12.等待时间:300000ms.时间:2014年8月29日下午3:29:43
作业10完成了主题:12.8/29/2014 3:34:43 PM
工作11开始于主题:12.等待时间:600000ms.时间:2014年8月29日下午3:34:43
工作6完成线程:11.2014年8月29日下午3:36:43
工作12开始于主题:11.等待时间:300000ms.时间:2014年8月29日下午3:36:43
工作12完成了主题:11.2014年8月29日下午3:41:43
工作13开始于主题:11.等待时间:100000ms.时间:2014年8月29日下午3:41:43
作业9完成了主题:7.2014年8月29日下午3:41:53
Job 14从线程开始:7.等待时间:300000ms.时间:2014年8月29日下午3:41:53
工作13完成线程:11.2014年8月29日下午3:43:23
工作11完成了主题:12.2014年8月29日下午3:44:43
工作5完成线程:14.8/29/2014 3:44:58 PM …
.net c# task-parallel-library async-await blockingcollection
解决方案1和2有什么区别,_taskQ是BlockingCollection,我正在尝试实现Producer-Consumer方案.BlockingCollection使用默认的ConcurrentQueue进行内部存储.
//Solution 1
foreach (Action action in _taskQ.GetConsumingEnumerable())
{
action(); // Perform task.
Thread.Sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)
没有项目时TryTake会阻止
//Solution 2
Action t;
while(_taskQ.TryTake(out t))
{
t();
Thread.Sleep(1000);
}
Run Code Online (Sandbox Code Playgroud) c# ×9
.net ×2
async-await ×1
behavior ×1
collections ×1
concurrency ×1
foreach ×1
list ×1
tpl-dataflow ×1