如何确保数据流块仅按需创建线程?

Jer*_*vel 19 c# multithreading asynchronous async-await tpl-dataflow

我使用TPL Dataflow API编写了一个小管道,它从多个线程接收数据并对它们执行处理.

设置1

当我为每个块配置它使用MaxDegreeOfParallelism = Environment.ProcessorCount(8在我的情况下)时,我注意到它填充多个线程中的缓冲区并且处理第二个块不会开始直到所有线程都收到+ - 1700个元素.你可以在这里看到这个.

设置2

当我设置MaxDegreeOfParallelism = 1然后我注意到在一个线程上接收所有元素并且在接收到+40个元素之后处理发送已经开始.数据在这里.

设置3

当我设置MaxDegreeOfParallelism = 1并在发送每个输入之前引入1000ms的延迟时,我注意到元素在收到后立即发送,并且每个接收的元素都放在一个单独的线程上.数据在这里.


到目前为止的设置.我的问题如下:

  1. 当我比较设置1和2时,我注意到与并行相比,在串行完成时处理元素的启动速度要快得多(即使考虑到并行具有8倍的线程数).是什么导致这种差异?

  2. 由于这将在ASP.NET环境中运行,因此我不想生成不必要的线程,因为它们都来自单个线程池.如设置3所示,即使只有少量数据,它仍会在多个线程上传播.这也是令人惊讶的,因为从设置1我会假设数据在线程上顺序传播(注意前50个元素都是如何进入线程16).我可以确保它只按需创建新线程吗?

  3. 还有另一个概念称为BufferBlock<T>.如果TransformBlock<T>已经排队输入,那么在我的管道(ReceiveElement)中交换第一步的实际区别是BufferBlock什么?


class Program
{
    static void Main(string[] args)
    {
        var dataflowProcessor = new DataflowProcessor<string>();
        var amountOfTasks = 5;
        var tasks = new Task[amountOfTasks];

        for (var i = 0; i < amountOfTasks; i++)
        {
            tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
        }

        foreach (var task in tasks)
        {
            task.Start();
        }

        Task.WaitAll(tasks);
        Console.WriteLine("Finished feeding threads"); // Needs to use async main
        Console.Read();
    }

    private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
    {
        return new Task(async () =>
        {
            await FeedData(dataflowProcessor, taskName);
        });
    }

    private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
    {
        foreach (var i in Enumerable.Range(0, short.MaxValue))
        {
            await Task.Delay(1000); // Only used for the delayedSerialProcessing test
            dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
        }
    }
}


public class DataflowProcessor<T>
{
    private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };

    private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
    {
        Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
        return element;
    }, ExecutionOptions);

    private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
    {
        Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(element);
    }, ExecutionOptions);

    static DataflowProcessor()
    {
        ReceiveElement.LinkTo(SendElement);

        ReceiveElement.Completion.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                ((IDataflowBlock) ReceiveElement).Fault(x.Exception);
            }
            else
            {
                ReceiveElement.Complete();
            }
        });
    }


    public void Process(T newElement)
    {      
        ReceiveElement.Post(newElement);
    }
}
Run Code Online (Sandbox Code Playgroud)

VMA*_*Atm 9

在将解决方案部署到ASP.NET环境之前,建议您更改体系结构:IIS可以在请求处理后将线程挂起在ASP.NET中供其自己使用,这样您的任务可能就无法完成。更好的方法是创建一个单独的Windows服务守护程序,以处理您的数据流。

现在回到TPL数据流。

我喜欢TPL数据流库,但是它的文档实在是一团糟。
我发现的唯一有用的文档是TPL Dataflow简介

其中有一些提示可能会有所帮助,尤其是有关配置设置的提示(我建议您TaskScheduler使用自己的TheadPool实现和MaxMessagesPerTask选项来研究实现自己的实现):

内置的数据流模块是可配置的,并提供了对模块如何以及在何处执行工作的大量控制。这是可供开发人员使用的一些关键旋钮,所有这些旋钮均通过DataflowBlockOptions类及其派生类型(ExecutionDataflowBlockOptionsGroupingDataflowBlockOptions)公开,可以在构造时将其实例提供给块。

  • @ i3arnon提到的TaskScheduler定制:

    默认情况下,数据流阻止将工作计划到TaskScheduler.Default,以.NET的内部工作为目标ThreadPool

  • 最大并行度

    默认为1,表示一次只能在一个块中发生一件事情。如果设置为大于的值1,则该块可以同时处理该数量的消息。如果设置为DataflowBlockOptions.Unbounded (-1),则可以同时处理任意数量的消息,而最大数量由数据流块针对的基础调度程序自动管理。请注意,这MaxDegreeOfParallelism是最大值,而不是必需条件。

  • MaxMessagesPerTask

    TPL Dataflow专注于效率和控制。在两者之间需要进行折衷的情况下,系统会努力提供质量默认值,但也使开发人员能够根据特定情况自定义行为。这样的例子之一就是性能与公平之间的权衡。默认情况下,数据流块会尝试最小化处理所有数据所需的任务对象的数量。这提供了非常有效的执行;只要一个块具有可用于处理的数据,该块的任务将保留以处理可用数据,仅在没有更多数据可用时退役(直到再次有数据可用,此时将启动更多任务)。但是,这可能导致公平问题。如果系统当前已饱和处理给定一组块中的数据,然后数据到达其他块,则这些后面的块将需要等待第一个块完成处理才能开始,否则就有可能过度订阅系统。对于给定情况,这可能是正确的行为,也可能不是正确的行为。为了解决这个问题,该MaxMessagesPerTask选项存在。默认为DataflowBlockOptions.Unbounded (-1),表示没有最大值。但是,如果设置为正数,则该数字将代表给定块可以使用单个任务处理的最大消息数。一旦达到该限制,该块必须退出该任务,并将其替换为副本以继续处理。这些副本相对于计划给调度程序的所有其他任务都得到了公平对待,从而允许块在它们之间实现一定程度的公平性。在极端情况下,如果MaxMessagesPerTask将其设置为1,则每条消息将使用一个任务,从而以可能需要更多任务的潜在代价来实现最终的公平。

  • MaxNumberOfGroups

    分组块能够跟踪它们产生了多少个分组,并在生成该数目的分组后自动完成自身(拒绝进一步提供的消息)。默认情况下,组数为DataflowBlockOptions.Unbounded(-1),但可以将其显式设置为大于1的值。

  • CancellationToken

    在数据流块的生存期内将监视此令牌。如果取消请求在块完成之前到达,则该块将尽可能有礼貌而迅速地停止运行。

  • 贪婪

    默认情况下,目标块是贪婪的,并且希望将所有数据提供给它们。

  • 有界能力

    这是该块可以存储并在任何一次飞行中的项目数的限制。

  • 问题不在于文档,因为数据流非常简单。问题实际上是在接受它是如此简单并且不需要任何特殊的技巧或设置。Stephen Cleary还撰写了一系列[介绍性博客文章](http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html) (2认同)
  • @PanagiotisKanavos不同意:操作指南并没有说很多关于块的设置,而我链接的文档却说得这么多。Stephen Cleary写了一篇很棒的“介绍”帖子,而不是OP需要超越基本的自定义。 (2认同)