TPL Dataflow仅在完成所有源数据块时保证完成

Mat*_*olf 23 c# concurrency task-parallel-library tpl-dataflow

两个完成转换后,如何重新编写代码完成的代码?我认为完成意味着它被标记为完成并且"出队列"是空的?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

我编辑了代码,为每个变换块添加了输入缓冲区计数.显然,所有100个项目都流式传输到每个变换块.但是只要其中一个transformblock完成,处理器块就不再接受任何项目,而不完整的transformblock的输入缓冲区只是刷新输入缓冲区.

svi*_*ick 31

问题正是casperOne在答案中所说的.第一个转换块完成后,处理器块进入"完成模式":它将处理其输入队列中的剩余项目,但不接受任何新项目.

除了将处理器块分成两部分之外,还有一个更简单的修复方法:不设置PropagateCompletion,而是在两个转换块完成时手动设置处理器块的完成:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());
Run Code Online (Sandbox Code Playgroud)


cas*_*One 26

这里的问题是,每次调用方法来链接块以及转换块中的等待时间不同时,都要设置PropagateCompletion属性.LinkTo

界面上的Complete方法文档(强调我的):IDataflowBlock

向IDataflowBlock发出信号,表示它不应该接受也不会产生任何更多的消息,也不会消耗更多的推迟消息.

因为你在每个TransformBlock<TInput, TOutput>实例中错开了等待时间transformBlock2(等待20毫秒)之前完成transformBlock1(等待50毫秒). transformBlock2首先完成,然后发送信号processorBlock,然后说"我不接受任何其他"(并且transformBlock1尚未生成所有消息).

请注意,transformBlock1之前的处理transformBlock1并非绝对保证; 线程池(假设您正在使用默认调度程序)将以不同的顺序处理任务是可行的(但很可能不会,因为一旦完成20毫秒的项目,它将从队列中窃取工作).

您的管道如下所示:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock
Run Code Online (Sandbox Code Playgroud)

为了解决这个问题,你想要一个看起来像这样的管道:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2
Run Code Online (Sandbox Code Playgroud)

这是通过创建两个单独的ActionBlock<TInput>实例来完成的,如下所示:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
Run Code Online (Sandbox Code Playgroud)

然后,您需要等待两个处理器块而不是一个:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
Run Code Online (Sandbox Code Playgroud)

这是一个非常重要的说明; 在创建时ActionBlock<TInput>,默认是将传递给它的实例上的MaxDegreeOfParallelism属性ExecutionDataflowBlockOptions设置为1.

这意味着您传递给Action<T>代理的调用ActionBlock<TInput>是线程安全的,一次只执行一个.

因为您现在有两个 ActionBlock<TInput>指向同一Action<T>委托的实例,所以不能保证线程安全.

如果您的方法是线程安全的,那么您不必执行任何操作(这将允许您将MaxDegreeOfParallelism属性设置为DataflowBlockOptions.Unbounded,因为没有理由阻止).

如果它不是线程安全的,并且您需要保证它,则需要求助于传统的同步原语,如lock语句.

在这种情况下,你会这样做(尽管显然不需要,因为上的WriteLine方法是线程安全的):Console

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...
Run Code Online (Sandbox Code Playgroud)

  • 如果对两个操作块使用相同的[`ExclusiveScheduler`](http://msdn.microsoft.com/en-us/library/system.threading.tasks.concurrentexclusiveschedulerpair.exclusivescheduler),则可以轻松避免锁定. (3认同)

pkt*_*pkt 8

svick的答案的补充:为了与使用PropagateCompletion选项获得的行为一致,您还需要在前一个块出现故障的情况下转发异常.像下面这样的扩展方法也会解决这个问题:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}
Run Code Online (Sandbox Code Playgroud)