使用Async CTP同时下载HTML页面

Max*_*ich 3 .net c# multithreading async-await async-ctp

尝试使用Async CTP编写HTML爬虫我遇到了如何编写一个无递归方法来实现这一点.

这是我到目前为止的代码.

private readonly ConcurrentStack<LinkItem> _LinkStack;
private readonly Int32 _MaxStackSize;
private readonly WebClient client = new WebClient();

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{
    string html = await client.DownloadStringTaskAsync(uri);
    return LinkFinder.Find(html, BaseURL);
};

Action<LinkItem> DownloadAndPush = async (o) => 
{
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href);
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize)
    {
        this._LinkStack.PushRange(result.ToArray());
        o.Processed = true;
    }  
};

Parallel.ForEach(this._LinkStack, (o) => 
{
    DownloadAndPush(o);
});
Run Code Online (Sandbox Code Playgroud)

但显然这并不像我希望的那样有效,因为在Parallel.ForEach执行第一次(也是唯一的迭代)的时候我只有一个项目.我能想到的最简单的方法是ForEach递归,但我不能(我不认为)这样做,因为我会快速耗尽堆栈空间.

任何人都可以指导我如何重构这个代码,创建我所描述的递归延续,添加项目直到MaxStackSize达到或系统内存不足?

svi*_*ick 10

我认为使用C#5/.Net 4.5做这样的事情的最好方法是使用TPL Dataflow.甚至还有一个关于如何使用它实现网络爬虫的演练.

基本上,您创建一个"块",负责下载一个URL并从中获取链接:

var cts = new CancellationTokenSource();

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink =
    async link =>
            {
                // WebClient is not guaranteed to be thread-safe,
                // so we shouldn't use one shared instance
                var client = new WebClient();
                string html = await client.DownloadStringTaskAsync(link.Href);

                return LinkFinder.Find(html, link.BaseURL);
            };

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });
Run Code Online (Sandbox Code Playgroud)

您可以设置MaxDegreeOfParallelism为任何所需的值.它最多可以同时下载多少个URL.如果您根本不想限制它,可以将其设置为DataflowBlockOptions.Unbounded.

然后,您创建一个块,以某种方式处理所有下载的链接,例如将它们全部存储在列表中.它还可以决定何时取消下载:

var links = new List<LinkItem>();

var storeBlock = new ActionBlock<LinkItem>(
    linkItem =>
    {
        links.Add(linkItem);
        if (links.Count == maxSize)
            cts.Cancel();
    });
Run Code Online (Sandbox Code Playgroud)

由于我们没有设置MaxDegreeOfParallelism,它默认为1.这意味着使用非线程安全的集合应该没关系.

我们再创建一个块:它将从中获取链接linkFinderBlock,并将其传递给storeBlock和返回linkFinderBlock.

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li);
Run Code Online (Sandbox Code Playgroud)

构造函数中的lambda是一个"克隆函数".如果您愿意,可以使用它来创建项目的克隆,但这里不需要它,因为我们不会修改LinkItem后创建.

现在我们可以将块连接在一起:

linkFinderBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(storeBlock);
broadcastBlock.LinkTo(linkFinderBlock);
Run Code Online (Sandbox Code Playgroud)

然后我们可以通过给第一个项目开始处理linkFinderBlock(或者broadcastBlock,如果你想发送它storeBlock):

linkFinderBlock.Post(firstItem);
Run Code Online (Sandbox Code Playgroud)

最后等到处理完成:

try
{
    linkFinderBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    if (!(ex.InnerException is TaskCanceledException))
        throw;
}
Run Code Online (Sandbox Code Playgroud)