c#.net 4.5 async/multithread?

Kyl*_*yle 28 c# multithreading .net-4.5

我正在编写一个从网页上抓取数据的C#控制台应用程序.

此应用程序将访问大约8000个网页并刮取数据(每页上的数据格式相同).

我现在正在使用它,没有异步方法,也没有多线程.

但是,我需要它更快.它只使用了大约3%-6%的CPU,我想是因为它花时间等待下载html.(WebClient.DownloadString(url))

这是我的程序的基本流程

DataSet alldata;

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with WebClient.DownloadString
    // and scrapes the data into several datatables which it returns as a dataset.
    DataSet dataForOnePage = ScrapeData(url);

    //merge each table in dataForOnePage into allData
}

// PushAllDataToSql(alldata);
Run Code Online (Sandbox Code Playgroud)

我一直试图多线程,但不知道如何正确开始.我正在使用.net 4.5并且我的理解是异步并且等待4.5以使这更容易编程但我仍然有点迷失.

我的想法是继续制作这条线异步的新线程

DataSet dataForOnePage = ScrapeData(url);
Run Code Online (Sandbox Code Playgroud)

然后当每个人完成时,跑

//merge each table in dataForOnePage into allData
Run Code Online (Sandbox Code Playgroud)

任何人都可以指出我正确的方向如何在.net 4.5 c#中使该行异步,然后让我的合并方法运行完成?

谢谢.

编辑:这是我的ScrapeData方法:

public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid)
{
    var dsPageData = new DataSet();

    // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
    string url = @"https://domain.com?&id=" + pageid + @"restofurl";
    string html = webClient.DownloadString(url);
    var doc = new HtmlDocument();
    doc.LoadHtml(html );

    // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
    return dsPageData ;
}
Run Code Online (Sandbox Code Playgroud)

cas*_*One 42

如果你想使用asyncawait关键字(虽然你没有,但它们确实在.NET 4.5中更容易),你首先要改变你的ScrapeData方法以使用关键字返回一个Task<T>实例async,如下所示:

async Task<DataSet> ScrapeDataAsync(Uri url)
{
    // Create the HttpClientHandler which will handle cookies.
    var handler = new HttpClientHandler();

    // Set cookies on handler.

    // Await on an async call to fetch here, convert to a data
    // set and return.
    var client = new HttpClient(handler);

    // Wait for the HttpResponseMessage.
    HttpResponseMessage response = await client.GetAsync(url);

    // Get the content, await on the string content.
    string content = await response.Content.ReadAsStringAsync();

    // Process content variable here into a data set and return.
    DataSet ds = ...;

    // Return the DataSet, it will return Task<DataSet>.
    return ds;
}
Run Code Online (Sandbox Code Playgroud)

请注意,您可能希望远离WebClient该类,因为它Task<T>在异步操作中本身不支持..NET 4.5中更好的选择是HttpClient.我选择使用HttpClient上面的.另外,请查看HttpClientHandler该类,特别是您将用于向每个请求发送cookie 的CookieContainer属性.

但是,这意味着您很可能不得不使用await关键字等待另一个异步操作,在这种情况下,很可能是下载页面.您必须定制下载数据的调用以使用异步版本await.

一旦完成,你通常会调用await它,但在这种情况下你不能这样做,因为你会对await变量.在这种情况下,您正在运行循环,因此每次迭代都会重置变量.在这种情况下,最好只将数据存储Task<T>在一个数组中:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url));
}
Run Code Online (Sandbox Code Playgroud)

有将数据合并的问题allData.为此,您希望在返回的实例上调用该ContinueWith方法,Task<T>并执行将数据添加到的任务allData:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

然后,你可以等待上使用的所有任务WhenAll方法Taskawait上:

// After your loop.
await Task.WhenAll(tasks);

// Process allData
Run Code Online (Sandbox Code Playgroud)

但是,请注意您有一个foreach,并WhenAll采取IEnumerable<T>实现.这是一个很好的指标,它适合使用LINQ,它是:

DataSet alldata;

var tasks = 
    from url in the8000Urls
    select ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });

await Task.WhenAll(tasks);

// Process allData
Run Code Online (Sandbox Code Playgroud)

如果您愿意,也可以选择不使用查询语法,在这种情况下无关紧要.

请注意,如果含有方法没有被标记为async(因为你是在一个控制台应用程序并等待结果的应用程序终止前),那么你可以简单地调用该Wait方法Task当你调用返回WhenAll:

// This will block, waiting for all tasks to complete, all
// tasks will run asynchronously and when all are done, then the
// code will continue to execute.
Task.WhenAll(tasks).Wait();

// Process allData.
Run Code Online (Sandbox Code Playgroud)

也就是说,重点是,您希望将Task实例收集到序列中,然后在处理之前等待整个序列allData.

但是,我建议在合并之前尝试处理数据,allData如果可以的话; 除非数据处理需要整个 数据处理,否则DataSet通过处理返回获得的数据,可以获得更多的性能提升,而不是等待所有数据返回.


Ste*_*ary 11

您还可以使用TPL Dataflow,它非常适合此类问题.

在这种情况下,您构建一个"数据流网格",然后您的数据流经它.

这个实际上更像是管道而不是"网格".我要分三步:从URL下载(字符串)数据; 将(字符串)数据解析为HTML然后转换为DataSet; 并DataSet融入主人DataSet.

首先,我们创建将进入网格的块:

DataSet allData;
var downloadData = new TransformBlock<string, string>(
  async pageid =>
  {
    System.Net.WebClient webClient = null;
    var url = "https://domain.com?&id=" + pageid + "restofurl";
    return await webClient.DownloadStringTaskAsync(url);
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var parseHtml = new TransformBlock<string, DataSet>(
  html =>
  {
    var dsPageData = new DataSet();
    var doc = new HtmlDocument();
    doc.LoadHtml(html);

    // HTML Agility parsing

    return dsPageData;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var merge = new ActionBlock<DataSet>(
  dataForOnePage =>
  {
    // merge dataForOnePage into allData
  });
Run Code Online (Sandbox Code Playgroud)

然后我们将三个块链接在一起以创建网格:

downloadData.LinkTo(parseHtml);
parseHtml.LinkTo(merge);
Run Code Online (Sandbox Code Playgroud)

接下来,我们开始将数据泵入网格:

foreach (var pageid in the8000urls)
  downloadData.Post(pageid);
Run Code Online (Sandbox Code Playgroud)

最后,我们等待网格中的每个步骤完成(这也将干净地传播任何错误):

downloadData.Complete();
await downloadData.Completion;
parseHtml.Complete();
await parseHtml.Completion;
merge.Complete();
await merge.Completion;
Run Code Online (Sandbox Code Playgroud)

TPL Dataflow的优点在于您可以轻松控制每个部分的并行程度.现在,我已经设置了下载和解析块Unbounded,但您可能想要限制它们.合并块使用默认的最大并行度1,因此合并时不需要锁定.

  • 如果今天有人提出这个问题,我会回答一个基于TPL的解决方案,而不是[我给的那个](http://stackoverflow.com/a/11639434/50776); 它绝对更容易连接所有东西,更清洁. (2认同)
  • TPL Dataflow是一个基于"任务"的异步网格.它实际上并不是TPL的一部分,因为它存在于.NET中,而是由同一个团队开发的附加库(他们也开发了`async`支持类型). (2认同)
  • @iNfinity这是不正确的.它实际上非常接近它的名字.它不必受CPU限制,您可以轻松地将I/O绑定操作作为数据流的一部分.它是将操作分解为块然后将所有块链接在一起,能够控制所有块如何处理并行,缓冲等事情.这一点都没有过分,IMO,一旦你得到它,块非常容易组合在一起,你可以在这些逻辑单元中看到适合TPL的东西. (2认同)