将动态创建的ActionBlocks链接到BufferBlock

Dim*_*tri 5 c# task-parallel-library tpl-dataflow

我不确定这是否可能,但如果是的话,我可能做得不对.假设我有一个链接到许多消费者的共享缓冲区(ActionBlocks).每个使用者应该使用满足用于将其链接到缓冲区的谓词的数据.例如,ActionBlock1应该使用满足的数字x => x % 5 == 0,ActionBlock2应该只消耗x => x % 5 == 1等.

这是我得到的:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
    }

    return productionQueue;
}
Run Code Online (Sandbox Code Playgroud)

然后我打电话给:

Random rnd = new Random();

ITargetBlock<int> temp = BuildPipeline(5);

while (true)
{
    temp.Post(rnd.Next(255));
}
Run Code Online (Sandbox Code Playgroud)

但这不起作用.控制台中不显示任何输出.如果我修改BuildPipeline方法为:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
    ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
    ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
    ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
    ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));

    productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
    productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
    productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
    productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
    productionQueue.LinkTo(productionLine5, x => x % 5 == 4);

    return productionQueue;
}
Run Code Online (Sandbox Code Playgroud)

代码完成了预期的工作.

有人可以阐明为什么动态创建和链接动作块不起作用?

PS如果我在ITargetBlock<int> temp = BuildPipeline(5);temp确实显示5个目标链接到缓冲区之后立即进入代码.每个目标的ID都不同.

提前致谢

编辑:添加svick建议的更改,但仍然没有好处:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        int j = i;
        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
    productionQueue.LinkTo(discardedLine);

    return productionQueue;
}
Run Code Online (Sandbox Code Playgroud)

现在只有第二条生产线处理数据(满足x%5 == 1谓词的数据).并且数据不满足谓词,这意味着我得到的数字以9和7结尾.

编辑:工作代码如下所示:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        int j = i;
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    productionQueue.LinkTo(DataflowBlock.NullTarget<int>());

    return productionQueue;
}
Run Code Online (Sandbox Code Playgroud)

svi*_*ick 5

问题是在第一个版本中,您为每个目标块使用相同的谓词.换句话说,谓词不依赖于i.

但即使它确实如此,您的代码也无法工作,因为i变量在谓词之间共享,因此它们都将使用最后一个值.解决方法是复制i到局部变量并在谓词中使用它.

代码可能如下所示:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    for (int i = 0; i < NumProductionLines; i++)
    {
        int iCopy = i;

        ActionBlock<int> productionLine = new ActionBlock<int>(
            num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num));

        productionQueue.LinkTo(
            productionLine, x => x % NumProductionLines == iCopy);
    }

    return productionQueue;
}
Run Code Online (Sandbox Code Playgroud)

如果你问为什么你的代码至少不处理x % 5 == 1数字,那是因为随机生成器可能会生成一个与该谓词不匹配的数字,因此没有一个ActionBlocks会接受它.因此,该数字将保留在源块的输出队列中,其他数字将无法通过.

如果在您的实际代码中,类似的情况可能发生并且您想要丢弃所有不适合任何谓词的数字,那么在将源块链接到所有谓词后,您可以将源块链接到不执行任何操作的块并接受任何操作你有用的块:

productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
Run Code Online (Sandbox Code Playgroud)