如何在F#中实现Asynchrony而不是Parallelism

Gre*_*egC 8 performance f# asynchronous .net-4.0 task-parallel-library

(坚持使用异步获取许多网页的常见示例)

我将如何异步分离多个(数百个)网页请求,然后在进入下一步之前等待所有请求完成?Async.AsParallel一次处理几个请求,由CPU上的核心数控制.抓取网页不是CPU绑定操作.不满意Async.AsParallel的加速,我正在寻找替代方案.

我试图连接Async.StartAsTask和Task []之间的点.WaitAll.本能地,我编写了以下代码,但它没有编译.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 
Run Code Online (Sandbox Code Playgroud)

你会怎么做?

Bri*_*ian 8

Async.Parallel几乎肯定就在这里.不确定你不满意的是什么; F#asyncs的优势在于异步计算比在任务并行CPU绑定的东西(更适合Tasks和.NET 4.0 TPL).这是一个完整的例子:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""
Run Code Online (Sandbox Code Playgroud)

在我的4核盒子上,这始终提供近4倍的加速.

编辑

在回复您的评论时,我已经更新了代码.你是对的,因为我添加了更多的网站而且没有看到预期的加速(仍然稳定在4倍左右).我已经开始在上面添加一些调试输出,将继续调查,看看是否有其他东西限制了连接......

编辑

再次编辑代码.好吧,我发现可能是瓶颈.这是PowerPack中AsyncReadToEnd的实现:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())
Run Code Online (Sandbox Code Playgroud)

换句话说,它只是阻塞线程池线程并同步读取.哎呀!让我看看我是否可以解决这个问题.

编辑

好吧,PowerPack中的AsyncStreamReader做对了,我现在正在使用它.

但是,关键问题似乎是差异.

当你点击cnn.com时,很多时候结果会像500毫秒一样重新出现.但是每隔一段时间你就得到一个需要4s的请求,这当然可能会杀死明显的异步性,因为整个时间是不幸的请求的时间.

运行上面的程序,我看到家里的2核盒子上的加速比从大约2.5倍到9倍.但是,它变化很大.我仍然错过了程序中存在的一些瓶颈,但我认为网络的差异可能会解释我此时所看到的所有内容.