Parallel.Invoke - 动态创建更多'线程'

Sle*_*Bos 5 c# task-parallel-library

我正在教自己Parallel.Invoke,以及一般的并行处理,用于当前项目.我需要向正确的方向努力,以了解如何根据需要动态\智能地分配更多并行"线程".

举个例子.假设您正在解析大型日志文件.这涉及从文件中读取,对返回的行进行某种解析,最后写入数据库.

所以对我来说,这是一个可以从并行处理中受益的典型问题.

作为简单的第一遍,以下代码实现了这一点.

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);
Run Code Online (Sandbox Code Playgroud)

在幕后

  1. readFileLinesToBuffer()读取每一行并存储到缓冲区.
  2. parseFileLinesFromBuffer出现并使用缓冲区中的行,然后让它们将它们放在另一个缓冲区中,以便updateResultsToDatabase()可以出现并使用此缓冲区.

因此,所显示的代码假定这三个步骤中的每一个都使用相同数量的时间\资源,但是假设parseFileLinesFromBuffer()是一个长时间运行的进程,所以不要只运行其中一个方法,而是要并行运行两个.

如何让代码根据它可能感知到的任何瓶颈智能地决定这样做?

从概念上讲,我可以看到一些监视缓冲区大小的方法是如何工作的,产生一个新的"线程"来以更高的速率使用缓冲区,例如......但我认为在将TPL库放在一起时已经考虑过这种类型的问题.

一些示例代码会很棒,但我真的只需要了解我接下来要调查的概念.看起来可能是System.Threading.Tasks.TaskScheduler持有密钥?

Jup*_*aol 4

您尝试过反应式扩展吗?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

Rx是微软的一项新技术,其重点如官方网站所述:

Reactive Extensions (Rx)... ...是一个使用可观察集合和 LINQ 样式查询运算符组成异步和基于事件的程序的库。

您可以将其下载为 Nuget 包

https://nuget.org/packages/Rx-Main/1.0.11226

由于我目前正在学习 Rx,所以我想采用这个示例并为其编写代码,我最终得到的代码实际上并不是并行执行的,而是完全异步的,并保证源代码行按顺序执行。

也许这不是最好的实现,但是就像我说的我正在学习Rx,(线程安全应该是一个很好的改进)

这是我用来从后台线程返回数据的 DTO

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

这些是完成实际工作的基本方法,我用一个简单的方法模拟时间Thread.Sleep,并返回用于执行每个方法的线程Thread.CurrentThread.ManagedThreadId。注意它的定时器ProcessLine是4秒,这是最耗时的操作

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}
Run Code Online (Sandbox Code Playgroud)

下面的方法我只是用它来更新UI

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}
Run Code Online (Sandbox Code Playgroud)

最后这是调用 Rx API 的方法

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}
Run Code Online (Sandbox Code Playgroud)

该过程完全在后台运行,这是生成的输出:

在此输入图像描述