Gre*_*egC 8 performance f# asynchronous .net-4.0 task-parallel-library
(坚持使用异步获取许多网页的常见示例)
我将如何异步分离多个(数百个)网页请求,然后在进入下一步之前等待所有请求完成?Async.AsParallel一次处理几个请求,由CPU上的核心数控制.抓取网页不是CPU绑定操作.不满意Async.AsParallel的加速,我正在寻找替代方案.
我试图连接Async.StartAsTask和Task []之间的点.WaitAll.本能地,我编写了以下代码,但它没有编译.
let processItemsConcurrently (items : int seq) =
let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
Tasks.Task.WaitAll(tasks)
Run Code Online (Sandbox Code Playgroud)
你会怎么做?
Async.Parallel几乎肯定就在这里.不确定你不满意的是什么; F#asyncs的优势在于异步计算比在任务并行CPU绑定的东西(更适合Tasks和.NET 4.0 TPL).这是一个完整的例子:
open System.Diagnostics
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions
let sites = [|
"http://bing.com"
"http://google.com"
"http://cnn.com"
"http://stackoverflow.com"
"http://yahoo.com"
"http://msdn.com"
"http://microsoft.com"
"http://apple.com"
"http://nfl.com"
"http://amazon.com"
"http://ebay.com"
"http://expedia.com"
"http://twitter.com"
"http://reddit.com"
"http://hulu.com"
"http://youtube.com"
"http://wikipedia.org"
"http://live.com"
"http://msn.com"
"http://wordpress.com"
|]
let print s =
// careful, don't create a synchronization bottleneck by printing
//printf "%s" s
()
let printSummary info fullTimeMs =
Array.sortInPlaceBy (fun (i,_,_) -> i) info
// for i, size, time in info do
// printfn "%2d %7d %5d" i size time
let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
printfn "longest request took %dms" longest
let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
let seconds = float fullTimeMs / 1000.
printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)
let FetchAllSync() =
let allsw = Stopwatch.StartNew()
let info = sites |> Array.mapi (fun i url ->
let sw = Stopwatch.StartNew()
print "S"
let req = WebRequest.Create(url)
use resp = req.GetResponse()
use stream = resp.GetResponseStream()
use reader = new StreamReader(stream,
System.Text.Encoding.UTF8, true, 4096)
print "-"
let contents = reader.ReadToEnd()
print "r"
i, contents.Length, sw.ElapsedMilliseconds)
let time = allsw.ElapsedMilliseconds
printSummary info time
time, info |> Array.sumBy (fun (_,size,_) -> size)
let FetchAllAsync() =
let allsw = Stopwatch.StartNew()
let info = sites |> Array.mapi (fun i url -> async {
let sw = Stopwatch.StartNew()
print "S"
let req = WebRequest.Create(url)
use! resp = req.AsyncGetResponse()
use stream = resp.GetResponseStream()
use reader = new AsyncStreamReader(stream, // F# PowerPack
System.Text.Encoding.UTF8, true, 4096)
print "-"
let! contents = reader.ReadToEnd() // in F# PowerPack
print "r"
return i, contents.Length, sw.ElapsedMilliseconds })
|> Async.Parallel
|> Async.RunSynchronously
let time = allsw.ElapsedMilliseconds
printSummary info time
time, info |> Array.sumBy (fun (_,size,_) -> size)
// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length
for i in 1..3 do // to warmup and show variance
let time1,r1 = FetchAllSync()
printfn "Sync took %dms, result was %d" time1 r1
let time2,r2 = FetchAllAsync()
printfn "Async took %dms, result was %d (speedup=%2.2f)"
time2 r2 (float time1/ float time2)
printfn ""
Run Code Online (Sandbox Code Playgroud)
在我的4核盒子上,这始终提供近4倍的加速.
编辑
在回复您的评论时,我已经更新了代码.你是对的,因为我添加了更多的网站而且没有看到预期的加速(仍然稳定在4倍左右).我已经开始在上面添加一些调试输出,将继续调查,看看是否有其他东西限制了连接......
编辑
再次编辑代码.好吧,我发现可能是瓶颈.这是PowerPack中AsyncReadToEnd的实现:
type System.IO.StreamReader with
member s.AsyncReadToEnd () =
FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())
Run Code Online (Sandbox Code Playgroud)
换句话说,它只是阻塞线程池线程并同步读取.哎呀!让我看看我是否可以解决这个问题.
编辑
好吧,PowerPack中的AsyncStreamReader做对了,我现在正在使用它.
但是,关键问题似乎是差异.
当你点击cnn.com时,很多时候结果会像500毫秒一样重新出现.但是每隔一段时间你就得到一个需要4s的请求,这当然可能会杀死明显的异步性,因为整个时间是不幸的请求的时间.
运行上面的程序,我看到家里的2核盒子上的加速比从大约2.5倍到9倍.但是,它变化很大.我仍然错过了程序中存在的一些瓶颈,但我认为网络的差异可能会解释我此时所看到的所有内容.