任务并行库中的BlockingCollection不会自动释放基础实例的引用

Ste*_*eve 6 c# multithreading task-parallel-library

我使用a BlockingCollection在C#4.0中实现生产者 - 消费者模式.

BlockingCollection拥有这占用了不少的内存物品.我想让生产者一次从BlockingCollection中取出一个项目并进行处理.

我想通过使用foreach BlockingCollection.GetConsumingEnumerable(),每次BlockingCollection都会从底层队列中删除项目(这意味着所有与引用一起)所以在处理项目的Process()方法的末尾,项目可能是垃圾集.

但是这是错误的.似乎foreach循环BlockingCollection.GetConsumingEnumerable()确实保存了输入队列的项目的所有引用.所有物品都被保持(因此防止被垃圾收集)直到踩出foreach循环.

BlockingCollection.GetConsumingEnumerable()我没有使用简单的foreach循环,而是使用while循环测试BlockingCollection.IsComplete标记,并在循环内部BlockingCollection.Take()用于获取耗材项.我认为它BlockingCollection.Take()具有类似的效果List.Remove(),它将从BlockingCollection中删除该项的引用.但这又是错误的.所有项目仅在while循环之外收集垃圾.

所以我的问题是,我们如何轻松实现这一要求,以便BlockingCollection可能会占用内存消耗的项目,并且一旦消费者使用了每个项目就可以进行垃圾回收?非常感谢您的帮助.

编辑:根据要求,添加了一个简单的演示代码:

// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
    private static int counter_;
    private int id_;
    public int ID { get{ return id_; } }
    public Entity() { id_ = counter++; }
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}

...

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();

// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
    tasks_.Add(Task.Factory.StartNew(ProduceEntity);
    Console.WriteLine("Start processing.");
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
    Task.WaitAll(tasks_.ToArray());
}

// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
    {
        var newEntity = new Entity();
        Console.WriteLine("Create entity {0}.", newEntity.ID);
        jobQueue_.Add(newEntity);
    }
    jobQueue_.CompleteAdding();
}

// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
    while(!jobQueue_.IsCompleted){
        Entity entity;
        if(jobQueue_.TryTake(entity))
        {
            Console.WriteLine("Process entity {0}.", entity.ID);
            entity = null;

            // I would assume after GC, the entity will be finalized and garbage collected, but NOT.
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
        }
    }
    Console.WriteLine("Finish processing.");
}
Run Code Online (Sandbox Code Playgroud)

输出是所有创建和处理消息,然后是"完成处理".然后是来自实体的所有销毁消息.并且创建实体消息显示从0到9的Entity.ID以及显示从9到0的Entity.ID的销毁消息.

编辑:

即使我设置了BlockingCollection的绑定容量,所有进入它的项目只有在循环退出时才会终止,这很奇怪.

D.F*_*hnn 6

ConcurrentQueue包含具有32个项目的内部数组的段.在段被垃圾收集之前,实体项不会被垃圾收集.从队列中取出所有32个项目后会发生这种情况.如果您更改示例以添加32个项目,您将在"完成处理"之前看到"销毁实体"消息.


Jac*_*eja 2

BlockingCollection 是否继续保留引用取决于它所使用的集合类型。

默认的集合类型是BlockingCollection<T>.ConcurrentQueue<T>

因此垃圾收集行为将取决于收集类型。在 > 的情况下ConcurrentQueue<T,这是一个 FIFO 结构,所以如果在从队列中删除引用后没有释放数据结构中的引用(这是队列的定义),我会感到非常惊讶!

您究竟如何确定对象没有被垃圾收集?