标签: blockingcollection

有界BlockingCollections可以在添加期间丢失数据

我有一个BlockingCollection(ConcurrentBag,50000),我试图为生产者线程使用一个非常小的有限容量50,000,以便最大化我可以在我的消费者线程的ConcurrentDictionary中处理的记录数.生产者比消费者快得多,并且会消耗大部分内存.

不幸的是,我立即注意到我的ConcurrentDictionary中的记录总数现在大大低于在我的测试数据执行时添加50,000的有界容量后的记录.我读到BlockingCollection的.add方法应该无限期地阻塞,直到集合中有空间来执行add.但是,情况似乎并非如此.

问题:

  1. 如果在BlockingCollection中的容量释放之前调用了太多的add,那么BlockingCollection的.add方法最终会超时或者无声地失败吗?

  2. 如果#1的答案是肯定的,那么在超过限制容量而不丢失数据后我可以尝试多少次添加?

  3. 如果调用了许多正在等待/阻塞容量的BlockingCollection .add()方法并且调用了CompleteAdding()方法,那些等待/阻塞添加会继续等待然后最终添加还是静默失败?

c# parallel-processing multithreading concurrentdictionary blockingcollection

5
推荐指数
1
解决办法
2593
查看次数

foreach是否从C#BlockingCollection中删除?

有没有人知道,当迭代C#BlockingCollection <>时,是否从集合中获取元素,就像BlockingCollection.Take()所做的那样?

BlockingCollection<int> q = new BlockingCollection<int>();
[...]
foreach(int i in q)
{
    //does q still contain i?
}
Run Code Online (Sandbox Code Playgroud)

谢谢

编辑:当然我的意思是BlockingCollection,但由于某种原因,我的头脑中有了BlockingQueue并使用了它.

c# foreach behavior blockingqueue blockingcollection

5
推荐指数
1
解决办法
1852
查看次数

ConcurrentDictionary 的列表顺序是否有保证?

我正在使用 ConcurrentDictionary 来存储日志行,当我需要向用户显示它们时,我会调用ToList()以生成一个列表。但奇怪的是,有些用户在列表中最先收到最近的行,而从逻辑上讲,他们应该在最后。

这是因为 ConcurrentDictionary 不能保证 IEnumerate 接口上的持久顺序,或者是什么原因?

.net concurrency list concurrentdictionary blockingcollection

4
推荐指数
1
解决办法
3092
查看次数

在C#中打印从BlockingCollection获取的数据时,Foreach抛出错误

我遇到了多线程的绊脚石.我想我知道问题是什么,但无法确定如何解决它.但我可能错了.

总之,我有生产者和消费者线程.生产者线程将来自外部源的数据收集到数据表中,然后将它们放入集合中.然后,消费者从集合中获取数据表.我使用BlockingCollection作为公共静态集合,因此两个线程都可以访问它,它存在于两个不同的类中.我现在将展示代码的主要部分,然后解释什么是和不起作用.

制片人主题:

try
{
     dataTable.Clear();
     adapter.Fill(dataTable);
     dataCaptured = true;
     timeout = 0;
     ThreadInfo.setCurrentDate(startDate);
     ThreadInfo.dataTableCollection.Add(dataTable);
}
Run Code Online (Sandbox Code Playgroud)

消费者线程

while(true)
{
     DataTable testTable = ThreadInfo.dataTableCollection.Take();
     foreach (DataRow datarow in testTable.Rows)
     {
          foreach (var item in datarow.ItemArray)
          {
                Console.WriteLine(item);
          }
     }
}
Run Code Online (Sandbox Code Playgroud)

所以我的测试显示,当生产者线程创建数据表时,它成功地将它们添加到集合中.我可以通过在add方法之前和之后使用count来看到这一点.计算每个表中的行数我还可以确认添加的表是与创建的表相同.此外,take方法还成功删除了一个表,该表与输入的表匹配.我知道这一点既可以计算集合中的表数,也可以计算'take'数据表中的行数.

我的问题是当我尝试运行foreach循环以打印出结果时.最初它工作并开始将数据打印到屏幕,但随后抛出此错误:

System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Collection was modified; enumeration operation might not execute.
  Source=System.Data
  StackTrace:
       at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
       at pullPlexTable.InputThreads.dataConsumerThread() in \\srv-file01\users$\dkb\Visual Studio 2013\Projects\pullPlexTable\pullPlexTable\InputThread.cs:line 39
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback …
Run Code Online (Sandbox Code Playgroud)

c# multithreading blockingcollection

4
推荐指数
1
解决办法
234
查看次数

使用线程和 EventWaitHandle 的生产者/消费者模式

我想这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,是否会出现ReceivingThread()orSendingThread()方法中的 while 循环停止执行的情况。请注意,它EnqueueSend(DataSendEnqeueInfo info)是从多个不同线程调用的,我可能无法在这里使用任务,因为我肯定必须在单独的线程中使用命令。

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne(); …
Run Code Online (Sandbox Code Playgroud)

c# multithreading producer-consumer blockingcollection tpl-dataflow

4
推荐指数
1
解决办法
5643
查看次数

C# 使用 BlockingCollection 正确中止始终运行的线程

我正在运行一个线程

while(isRunning) 
{ 
    blockingCollection.Take()
}
Run Code Online (Sandbox Code Playgroud)

首先我设置isRunning为false。然后我调用thread.Interrupt它阻止blockingCollection等待新项目。之后,我Dispose为在 catch 块内运行线程的类调用我的方法。

这是最好的/正确的方法吗?

c# multithreading blockingcollection

4
推荐指数
1
解决办法
485
查看次数

将Collection添加到BlockingCollection

BlockingCollection仅包含添加单个项目的方法.如果我想添加一个集合怎么办?我应该只使用"foreach"循环吗?

为什么BlockingCollection不包含添加集合的方法?我认为这种方法非常有用.

c# blockingcollection

3
推荐指数
1
解决办法
2098
查看次数

GetConsumingEnumerable实际上是否从BlockingCollection中删除了一个项目?

MSDN上的评论在http://msdn.microsoft.com/en-us/library/dd267312.aspx上说明......

"BlockingCollection的默认集合类型是ConcurrentQueue"

这是否意味着当我在集合上运行"GetConsumingEnumerable()"时,拉出的项目正在从队列中出列,并且在使用后将标记为GC?

换句话说......在下面的代码片段中,

foreach (var item in collection.GetConsumingEnumerable())
        {
            //do something with item
        }
Run Code Online (Sandbox Code Playgroud)

循环迭代后项目会发生什么?

c# collections blockingcollection

3
推荐指数
1
解决办法
2660
查看次数

阻止收集过程一次n个项目 - 一旦完成,就会继续

我有以下场景.

  1. 我从数据库中将50个作业转换为阻塞集合.

  2. 每项工作都是一项长期工作.(可能是).所以我想在一个单独的线程中运行它们.(我知道 - 将它们作为Task.WhenAll运行并让TPL解决它可能会更好 - 但我想控制同时运行多少次)

  3. 说我想同时运行其中的5个(可配置)

  4. 我创建了5个任务(TPL),每个作业一个并行并行运行.

我想要做的是,一旦第4步中的一个作业完成,就立即在阻塞集合中选择下一个作业,并继续进行直到完成所有50个作业.

我正在考虑创建一个Static blockingCollection和一个TaskCompletionSource,它将在作业完成时调用,然后它可以再次调用使用者从队列中一次选择一个作业.我还想在每项工作上调用async/await - 但这是最​​重要的 - 不确定这是否对该方法有影响.

这是完成我想要做的事情的正确方法吗?

链接类似,但是捕获的是我想在前N个项目中的一个完成后立即处理下一个作业.毕竟N都没有完成.

更新:

好吧,如果有人想在以后使用它,我有这个代码片段正是我想要的.如下所示,创建了5个线程,每个线程在完成当前操作时启动下一个作业.在任何给定时间只有5个线程处于活动状态.我知道这可能不会像这样100%工作,并且如果与一个cpu/core一起使用,将会出现上下文切换的性能问题.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);
Run Code Online (Sandbox Code Playgroud)

工作2开始于主题:13.等待时间:3600000ms.时间:2014年8月29日下午3:14:43

工作4开始于主题:14.等待时间:15000ms.时间:2014年8月29日下午3:14:43

作业0在线程上开始:7.等待时间:600000ms.时间:2014年8月29日下午3:14:43

作业1开始于主题:12.等待时间:900000ms.时间:2014年8月29日下午3:14:43

工作3开始于主题:11.等待时间:120000ms.时间:2014年8月29日下午3:14:43

工作4完成了主题:14.2014年8月29日下午3:14:58

作业5开始于主题:14.等待时间:1800000ms.时间:2014年8月29日下午3:14:58

工作3完成线程:11.2014年8月29日下午3:16:43

Job 6从线程开始:11.等待时间:1200000ms.时间:2014年8月29日下午3:16:43

作业0完成了主题:7.2014年8月29日下午3:24:43

Job 7在线程上开始:7.等待时间:30000ms.时间:2014年8月29日下午3:24:43

工作7完成线程:7.2014年8月29日下午3:25:13

Job 8开始于主题:7.等待时间:100000ms.时间:2014年8月29日下午3:25:13

作业8完成了主题:7.2014年8月29日下午3:26:53

Job 9从线程开始:7.等待时间:900000ms.时间:2014年8月29日下午3:26:53

工作1完成线程:12.2014年8月29日下午3:29:43

作业10开始于主题:12.等待时间:300000ms.时间:2014年8月29日下午3:29:43

作业10完成了主题:12.8/29/2014 3:34:43 PM

工作11开始于主题:12.等待时间:600000ms.时间:2014年8月29日下午3:34:43

工作6完成线程:11.2014年8月29日下午3:36:43

工作12开始于主题:11.等待时间:300000ms.时间:2014年8月29日下午3:36:43

工作12完成了主题:11.2014年8月29日下午3:41:43

工作13开始于主题:11.等待时间:100000ms.时间:2014年8月29日下午3:41:43

作业9完成了主题:7.2014年8月29日下午3:41:53

Job 14从线程开始:7.等待时间:300000ms.时间:2014年8月29日下午3:41:53

工作13完成线程:11.2014年8月29日下午3:43:23

工作11完成了主题:12.2014年8月29日下午3:44:43

工作5完成线程:14.8/29/2014 3:44:58 PM …

.net c# task-parallel-library async-await blockingcollection

3
推荐指数
1
解决办法
942
查看次数

TryTake vs GetConsumingEnumerable

解决方案1和2有什么区别,_taskQ是BlockingCollection,我正在尝试实现Producer-Consumer方案.BlockingCollection使用默认的ConcurrentQueue进行内部存储.

//Solution 1
foreach (Action action in _taskQ.GetConsumingEnumerable())
{
    action(); // Perform task.
    Thread.Sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)

没有项目时TryTake会阻止

//Solution 2
Action t;
while(_taskQ.TryTake(out t))
{
    t();
    Thread.Sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)

c# blockingcollection

3
推荐指数
1
解决办法
4809
查看次数