标签: blockingcollection

.Net Concurrent BlockingCollection有内存泄漏?

我正在使用带有> 的生产者/消费者模式System.Collection.Concurrent.BlockingCollection<DataTable来从数据库(生产者)检索数据并在数据(消费者)上创建Lucene索引.

Producer一次抓取10000条记录并将该添加到BlockingCollection<DataTable>.然后消费者(稍微慢一点)抓住那些10000并创建一个索引.

阻塞集合被限制为<DataTable>每个10000行中的5个.

起初程序运行得很好,但是在它获得大约150000行后,我注意到我的计算机内存最大化并且它慢慢爬行.

似乎BlockingCollection无法null在获取项目后将基础数组槽设置为.

码:

    private static LuceneIndex index;
    private static BlockingCollection<DataTable> blockingCol;

    private static void Producer()
    {
        while (true)
        {
            //...get next 10000 rows
            DataTable data = GetNextSet();
            if(data.Row.Count > 0)
                blockingCol.Add(products);
            else
                break;
        }
    }

    private static void Consumer()
    {
        while (!BlockingCol.IsCompleted || BlockingCol.Count > 0)
        {
            DataTable data = blockingCol.Take();
            index.UpdateIndex(GetLuceneDocs(data));
        }
    }


 public static void Main(System.String[] args)
 {
            index = new LuceneIndex();
            blockingCol …
Run Code Online (Sandbox Code Playgroud)

.net memory-leaks blockingcollection

6
推荐指数
1
解决办法
2055
查看次数

如何在没有竞争条件和异常的情况下检测BlockingCollection的AddingCompleted?

我使用的BlockingCollection{T}是仅由一个线程填充且仅由一个线程消耗的线程。生产和消费物品运行良好。问题出在这个操作的最后。任务在 处阻塞(如预期)GetConsumingEnumerableCompleteAdding调用任务后将处理BlockingCollection并完成,没有任何异常。到目前为止,一切都很好。

现在我有一个线程将项目添加到BlockingCollection. 该线程必须进行测试IsAddingCompleted,然后必须添加该项目。IsAddingCompleted但请求和添加项目之间存在竞争条件。有一个TryAdd- 方法,但如果添加已经完成,也会引发异常。

如何在没有额外锁定的情况下添加已完成的项目或测试?为什么会TryAdd抛出异常?如果添加已经完成,返回false即可。

非常简化的代码如下所示:

private BlockingCollection<string> _items = new BlockingCollection<string>();

public void Start()
{
    Task.Factory.StartNew(
        () =>
            {
                foreach (var item in this._items.GetConsumingEnumerable())
                {
                }

                this._items.Dispose();
            });

    Thread.Sleep(50); // Wait for Task

    this._items.CompleteAdding(); // Complete adding
}

public void ConsumeItem(string item)
{
    if (!this._items.IsAddingCompleted)
    {
        this._items.Add(item);
    }
}
Run Code Online (Sandbox Code Playgroud)

是的,我知道这段代码没有意义,因为几乎没有机会添加任何项目,并且 foreach 循环没有注意到。消耗任务对我的问题来说并不重要。

问题如ConsumeItem-method 所示。我可以添加一个额外的锁(信号量)ConsumeItem和 …

c# multithreading race-condition blockingcollection

6
推荐指数
0
解决办法
756
查看次数

在网络应用程序中使用Pika BlockingConnection是否可以?

我有点困惑BlockingConnectionAsyncoreConnection.我想从Django应用程序向RabbitMQ队列发送一些消息.使用全局BlockingConnection对象可以吗?

谢谢.

python django rabbitmq pika blockingcollection

6
推荐指数
1
解决办法
1217
查看次数

使用Task.WhenAll与BlockingCollection生成的无限任务

我将后台任务添加到阻止集合(在后台添加).

我在GetConsumingEnumerable返回的Enumerable上等待Task.WhenAll.

我的问题是:Task.WhenAll的重载是否接收到IEnumerable"准备好"可能会收到无穷无尽的任务?

我只是不确定我是否可以这样做,或者它是否意味着以这种方式使用?

private async Task RunAsync(TimeSpan delay, CancellationToken cancellationToken)
{
    using (BlockingCollection<Task> jobcollection = new BlockingCollection<Task>())
    {
        Task addingTask = Task.Run(async () =>
        {
            while (true)
            {
                DateTime utcNow = DateTime.UtcNow;
                var jobs = Repository.GetAllJobs();
                foreach (var job in GetRootJobsDue(jobs, utcNow))
                {
                    jobcollection.Add(Task.Run(() => RunJob(job, jobs, cancellationToken, utcNow), cancellationToken), cancellationToken);
                }

                await Task.Delay(delay, cancellationToken);
            }
        }, cancellationToken);

        await Task.WhenAll(jobcollection.GetConsumingEnumerable(cancellationToken));
    }
}
Run Code Online (Sandbox Code Playgroud)

c# task async-await blockingcollection

6
推荐指数
2
解决办法
695
查看次数

使用BlockingCollections在队列中运行多个线程

我的程序有3个功能.每个函数都会获取一个项目列表并填充某些信息.例如

class Item {
 String sku,upc,competitorName;
 double price;
}
Run Code Online (Sandbox Code Playgroud)

函数F1采用List并填充upc

功能F2取List(F1的输出)并填写价格.

功能F3取List(输出F2)并填充competitorName

F1可以一次处理5个项目, F2可以一次处理20个项目,F3也可以处理20个项目 .

现在我正在连续运行F1 - > F2 - > F3,因为F2需要来自F1的信息(UPC代码).F3需要F2的价格.

我想通过连续运行F1运行而不是等待F2和F3完成来使这个过程高效.F1执行并输出到队列中,然后F2一次取20个项目并处理它们.然后跟随F3.

如何通过使用BlockingCollection和Queue来实现这一目标?

task-parallel-library blockingcollection

6
推荐指数
1
解决办法
137
查看次数

并发集合和独特元素

我有BlockingCollection重复的元素.如何修改它以添加或获取不同的元素?

c# parallel-processing blockingcollection

5
推荐指数
1
解决办法
2101
查看次数

使用BlockingCollection更新ObservableCollection

我订阅了一项服务,该服务将在收到新元素时引发Event,然后将此元素添加到中BlockingCollection
我正在运行第二个线程,该线程将循环运行BlockingCollection以添加/更新可观察集合中的元素。

问题是您如何添加ObservableCollection?我知道我不能只.add对这种类型的集合执行操作,因为它需要在UI线程上完成。所以我尝试使用不同的ObservableCollection子类,该子类使用分派器来编组元素的添加,但是每一次,我都会遇到相同的错误

“未知模块中发生了'System.StackOverflowException类型的未处理的异常。”

故障排除提示如下:

确保您没有无限循环或无限递归。

好吧,实际上,我确实有某种无限循环,因为一直在BlockingQueue接收新元素,例如每秒5到10。
如果我不将控件绑定到observablecollectionough,或者使用List代替,则不会出现异常。

Class ElementHolder
{
    private ExternalService _externalService;
    private ObservableCollection<Element> _elementsList = new ObservableCollection<Element>();
    private BlockingCollection<Element> _elementsReceived = new BlockingCollection<Element>();

    public ObservableCollection<Element> ElementsList
    {
        get
        {
            return _elementList;
        }
        set
        {
            _elementList = value;
        }
    }

public ElementHolder()
    {
        Task.Factory.StartNew(ReadElements);
        _externalService = new ExternalService();
        _externalService.ReceivedNewElement += new Action<Element>(o => _elementsReceived.Add(o));
        _externalService.Subscribe();
    }

private void ReadElements()
    {
        foreach (Element element in …
Run Code Online (Sandbox Code Playgroud)

c# wpf observablecollection blockingcollection

5
推荐指数
1
解决办法
1592
查看次数

BlockingCollection <T>使用TPL DataFlow进行批处理

有没有办法批量阻止集合中的项集合.例如

我有一个消息传递总线发布者调用blockingCollection.Add()

还有一个像这样创建的消费线程:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });
Run Code Online (Sandbox Code Playgroud)

但是,我只希望控制台在阻塞集合上有10个项目后写入,而GetConsumingEnumerable()总是在每个项目添加后触发.我可以为此编写自己的队列但是如果可能的话我想使用阻塞集合吗?

c# task-parallel-library blockingcollection

5
推荐指数
1
解决办法
916
查看次数

一起使用 BlockingCollection&lt;T&gt; 和 TPL 数据流时出现死锁

我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。

再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。

关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。

    var blockingCollection = new BlockingCollection<int>(10000);

    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });

    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });

    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });

    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }

    addBlock.Complete();
    await addBlock.Completion; …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library blockingcollection tpl-dataflow

5
推荐指数
1
解决办法
877
查看次数

阻塞集合上的 Take/TryTake 和 Add/TryAdd 之间的区别

我一直在试图让我的头部周围的阻塞收集和我碰到Take()TryTake()Add()TryAdd()

我知道如果没有要取的项目,Take()将等到添加一个项目,类似地,Add()如果集合已达到其最大限制,它将等到项目被删除。

根据Josheph Albahari 关于并行编程的文章

“如果集合大小有界,Add 和 TryAdd 可能会阻塞;当集合为空时,Take 和 TryTake 会阻塞。”

因此Take()TryTake()两者都等待添加项目。那么,如果我们不提供任何超时或取消令牌,Take()和之间有什么区别TryTake(),不应该TryTake() return false直接而不是等待?和一样TryAdd()吗?

.net c# multithreading asynchronous blockingcollection

5
推荐指数
2
解决办法
3717
查看次数