如何从任何两个BlockingCollections中获取优先于第一个集合的项目?

Yac*_*sad 6 c# producer-consumer

我有两个BlockingCollection<T>对象,collection1collection2.我想要使​​用这些集合中的项目优先考虑项目collection1.也就是说,如果两个集合都有项目,我想先从中获取项目collection1.如果他们都没有物品,我想等待物品可用.

我有以下代码:

public static T Take<T>(
    BlockingCollection<T> collection1,
    BlockingCollection<T> collection2) where T:class
{
    if (collection1.TryTake(out var item1))
    {
        return item1;
    }

    T item2;

    try
    {
        BlockingCollection<T>.TakeFromAny(
            new[] { collection1, collection2 },
            out item2);
    }
    catch (ArgumentException)
    {
        return null;
    }

    return item2;
}
Run Code Online (Sandbox Code Playgroud)

在两个集合上调用此代码nullCompleteAdding,它们都会返回,并且它们都是空的.

我对此代码的主要问题是该TakeFromAny方法的文档指定TakeFromAny将在"集合"上调用ArgumentExceptionif CompleteAdding:

ArgumentException的

collections参数是一个0长度数组或包含null元素,或者已在集合上调用CompleteAdding().

如果CompleteAdding在任何集合上调用它会抛出吗?或两个集合?

如果CompleteAdding被调用并且集合仍然有一些项目,它会抛出吗?

我需要一种可靠的方法来做到这一点.

在这段代码中,我试图从collection1第一个开始,因为TakeFromAny如果两个集合有项目,文档不会对收集订单提供任何保证.

这也意味着如果两个集合都是空的,然后它们会在以后同时收到项目,那么我可能会从collection2第一个获得一个项目,这很好.

编辑:

我将项目添加到两个集合(而不仅仅是单个集合)的原因是第一个集合没有上限,第二个集合没有.

有兴趣为什么我需要这些的人的更多细节:

我在一个名为ProceduralDataflow的开源项目中使用它.有关详细信息,请参阅此处https://github.com/ymassad/ProceduralDataflow

数据流系统中的每个处理节点都有两个集合,一个集合将包含第一次出现的项目(因此我需要在需要时减慢生产者的速度),另一个集合将包含第二个(或第三个,... )次(由于数据流中的循环).

一个集合没有上限的原因是我不希望由于数据流中的循环而导致死锁.

Iva*_*oev 4

首先,简短回答您的具体问题。

CompleteAdding如果在任何集合上调用它会抛出异常吗?或者两个集合?

两者(全部) - 但前提是任何集合中都没有可用元素。

如果CompleteAdding被调用并且集合仍然有一些项目怎么办,它会抛出异常吗?

不会。如果集合中有可用元素,则会将其从集合中删除并返回给调用者。

结论

显然文档不清楚。那个部分

CompleteAdding()已被调用集合

应该有不同的表述 - 就像

或者任何集合中都没有可用元素并且CompleteAdding()已在所有集合上调用

基本原理

嗯,我知道依赖实现并不是一个好的做法,但是当文档不清楚时,实现是我能想到的唯一可靠的官方来源。所以取参考源码,同时TakeFromAny调用TryTakeFromAny私有方法TryTakeFromAnyCore。它从以下内容开始:

ValidateCollectionsArray(collections, false);
Run Code Online (Sandbox Code Playgroud)

false这是一个bool名为 的参数isAddOperation,在 中使用ValidateCollectionsArray如下:

if (isAddOperation && collections[i].IsAddingCompleted)
{
    throw new ArgumentException(
        SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}
Run Code Online (Sandbox Code Playgroud)

ArgumentException这是可能被调用的地方之一CompleteAdding()。正如我们所看到的,情况并非如此(问题#1)。

然后继续执行以下“快速路径”:

//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
        return i;
}
Run Code Online (Sandbox Code Playgroud)

这证明了问题#2的答案。

最后,如果任何集合中都没有可用元素,则实现通过调用另一个私有方法来采取“慢速路径” TryTakeFromAnyCoreSlow,以下注释是实现行为的基本解释:

//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw
Run Code Online (Sandbox Code Playgroud)

我们两个问题的答案都在案例#1 和案例#5 中(注意“ all”一词)。TakeFromAny顺便说一句,它还显示了和之间的唯一区别TryTakeFromAny- 案例 #4 与 #5,即throwvs return -1