响应流的昂贵异步读取

Joe*_*gen 1 f# asynchronous httpwebrequest

在过去的几天里,我一直在努力学习F#,我一直在困扰着我.我的"学习项目"是一些屏幕抓取器,用于处理我有兴趣操作的一些数据.

在F#PowerPack中有一个调用Stream.AsyncReadToEnd.我不想仅仅为那个单独的电话使用PowerPack,所以我看看他们是如何做到的.

module Downloader =
    open System
    open System.IO
    open System.Net
    open System.Collections

    type public BulkDownload(uriList : IEnumerable) =
        member this.UriList with get() = uriList

        member this.ParalellDownload() =
            let Download (uri : Uri) = async {
                let UnblockViaNewThread f = async {
                    do! Async.SwitchToNewThread()
                    let res = f()
                    do! Async.SwitchToThreadPool()
                    return res }

                let request = HttpWebRequest.Create(uri)
                let! response = request.AsyncGetResponse()
                use responseStream = response.GetResponseStream()
                use reader = new StreamReader(responseStream)
                let! contents = UnblockViaNewThread (fun() -> reader.ReadToEnd())
                return uri, contents.ToString().Length }

            this.UriList
            |> Seq.cast
            |> Seq.map Download
            |> Async.Parallel
            |> Async.RunSynchronously
Run Code Online (Sandbox Code Playgroud)

他们有UnblockViaNewThread功能.这真的是异步读取响应流的唯一方法吗?是不是创建一个真正昂贵的新线程(我已经看到了遍布各地的"〜1mb内存").有一个更好的方法吗?这是每次Async*通话中真正发生的事情(我能做到的let!)吗?

编辑:我遵循Tomas的建议,实际上想出了一些独立于F#PowerTools的东西.这里是.这确实需要错误处理,但它异步请求并将url下载到字节数组.

namespace Downloader
open System
open System.IO
open System.Net
open System.Collections

type public BulkDownload(uriList : IEnumerable) =
    member this.UriList with get() = uriList

    member this.ParalellDownload() =                
        let Download (uri : Uri) = async {
            let processStreamAsync (stream : Stream) = async { 
                let outputStream = new MemoryStream()
                let buffer = Array.zeroCreate<byte> 0x1000
                let completed = ref false
                while not (!completed) do
                    let! bytesRead = stream.AsyncRead(buffer, 0, 0x1000)
                    if bytesRead = 0 then
                        completed := true
                    else
                        outputStream.Write(buffer, 0, bytesRead)
                stream.Close()
                return outputStream.ToArray() }

            let request = HttpWebRequest.Create(uri)
            let! response = request.AsyncGetResponse()
            use responseStream = response.GetResponseStream()
            let! contents = processStreamAsync responseStream
            return uri, contents.Length }

        this.UriList
        |> Seq.cast
        |> Seq.map Download
        |> Async.Parallel
        |> Async.RunSynchronously

    override this.ToString() = String.Join(", ", this.UriList)
Run Code Online (Sandbox Code Playgroud)

Tom*_*cek 9

我认为AsyncReadToEnd只是同步调用ReadToEnd一个单独的线程是错误的.

F#PowerPack还包含一个AsyncStreamReader包含正确的流读取异步实现的类型.它有一个ReadLine(异步)返回下一行的方法,只从源流中下载几个块(使用异步ReadAsync而不是在后台线程上运行).

let processStreamAsync stream = async { 
  use asyncReader = new AsyncStreamReader(stream)
  let completed = ref false
  while not (!completed) do 
    // Asynchrnously get the next line
    let! nextLine = asyncReader.ReadLine()
    if nextLine = null then completed := true
    else
       (* process the next line *)  }
Run Code Online (Sandbox Code Playgroud)

如果要将整个内容下载为字符串(而不是逐行处理),则可以使用ReadToEnd方法AsyncStreamReader.这是一个正确的异步实现,它开始下载数据块(异步)并重复此操作而不会阻塞.

async { 
  use asyncReader = new AsyncStreamReader(stream)
  return! asyncReader.ReadToEnd() }
Run Code Online (Sandbox Code Playgroud)

此外,F#PowerPack是开放式的,并且具有许可许可,因此使用它的最佳方式通常是将您需要的少量文件复制到项目中.