如何迭代BufferBlock <T>中的项目?

Ans*_*hul 3 .net c# producer-consumer task-parallel-library tpl-dataflow

我最近开始使用.NET 4.5中的TPL Dataflow库,并且块的整个概念对我来说是新的.我正在我的应用程序中实现生产者 - 消费者队列,我需要防止重复的消息被放入队列,因此需要检查消息是否已经排队.我正在使用一种BufferBlock<Message>类型(Message是一种自定义类型).BufferBlock具有Count属性但在此问题中没有帮助,因为需要唯一标识消息.

有没有办法检查是否BufferBlock包含一个项目或检查所有项目并检查它们?是否有可能转换BufferBlock为允许迭代项目的东西?我正在按照我在MSDN上看到的一个例子,它没有检查项目是否在队列中,但我认为检查队列的内容是一个非常需要的操作.任何帮助表示赞赏.

spe*_*der 5

而不是闯入BufferBlock,为什么不能代替插入TransformManyBlock到,这是否对你的链条?您可以使用a HashSet,Add方法仅true在尚未添加项目时返回.它最终变得非常简单,但存储需求明显随着时间而增加......

void Main()
{
    var bb = new BufferBlock<string>();
    var db = DataflowEx.CreateDistinctBlock<string>();
    var ab = new ActionBlock<string>(x => Console.WriteLine(x));
    bb.LinkTo(db);
    db.LinkTo(ab);
    bb.Post("this");
    bb.Post("this");
    bb.Post("this");
    bb.Post("is");
    bb.Post("is");
    bb.Post("a");
    bb.Post("test");
}

public class DataflowEx
{
    public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
    {
        var hs = new HashSet<T>();
        //hs will be captured in the closure of the delegate
        //supplied to the TransformManyBlock below and therefore
        //will have the same lifespan as the returned block.
        //Look up the term "c# closure" for more info
        return new TransformManyBlock<T, T>(
                         x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
    }
}
Run Code Online (Sandbox Code Playgroud)

这样做的原因是,就像Linq的SelectMany一样,TransformManyBlock有效地展平了列表列表.因此,TransformManyBlock接受一个返回an的委托IEnumerable<T>,但一次提供返回的委托中的项目IEnumerable<T>.通过返回IEnumerable<T>其中包含0或1个项目的内容,我们可以有效地创建Where类似行为,允许项目通过或阻止它通过,具体取决于是否满足某些谓词.在这种情况下,谓词是否我们可以将项添加到捕获的HashSet中.