F#异步堆栈溢出

t0y*_*yv0 9 f# asynchronous mapreduce

我对基于异步的程序中的堆栈溢出感到惊讶.我怀疑主要问题是使用以下函数,它应该组成两个异步计算并行执行并等待两者完成:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
    async {
        let! x = Async.StartChild a
        let! y = Async.StartChild b
        do! x
        do! y
    }
Run Code Online (Sandbox Code Playgroud)

有了这个定义,我有以下mapReduce程序试图mapreduce部分和部分中利用并行性.非正式地,我们的想法是使用共享通道激发N映射器和N-1缩减器,等待它们完成,并从通道读取结果.我有自己的Channel实现,这里替换ConcurrentBag为更短的代码(问题影响两者):

let mapReduce (map    : 'T1 -> Async<'T2>)
              (reduce : 'T2 -> 'T2 -> Async<'T2>)
              (input  : seq<'T1>) : Async<'T2> =
    let bag = System.Collections.Concurrent.ConcurrentBag()

    let rec read () =
        async {
            match bag.TryTake() with
            | true, value -> return value
            | _           -> do! Async.Sleep 100
                             return! read ()
        }

    let write x =
        bag.Add x
        async.Return ()

    let reducer =
        async {
            let! x = read ()
            let! y = read ()
            let! r = reduce x y
            return bag.Add r
        }

    let work =
        input
        |> Seq.map (fun x -> async.Bind(map x, write))
        |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)

    async {
        do! work
        return! read ()
    }
Run Code Online (Sandbox Code Playgroud)

现在,以下基本测试开始在n = 10000上抛出StackOverflowException:

let test n  =
    let map x      = async.Return x
    let reduce x y = async.Return (x + y)
    mapReduce map reduce [0..n]
    |> Async.RunSynchronously
Run Code Online (Sandbox Code Playgroud)

编辑:<|>组合器的替代实现使测试在N = 10000时成功:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
  Async.FromContinuations(fun (ok, _, _) ->
    let count = ref 0
    let ok () =
        lock count (fun () ->
            match !count with
            | 0 -> incr count
            | _ -> ok ())
    Async.Start <|
        async {
            do! a
            return ok ()
        }
    Async.Start <|
        async {
            do! b
            return ok ()
        })
Run Code Online (Sandbox Code Playgroud)

这对我来说真的很令人惊讶,因为这是我所假设Async.StartChild的.有关哪种解决方案最佳的想法?

Tom*_*cek 4

我认为在启动使用运算符创建的异步工作流程时会发生堆栈溢出异常<|>。调用Async.StartChild启动第一个工作流程,该工作流程使用<|>and 进行组合,因此它再次调用Async.StartChildetc。

修复此问题的一个简单方法是在计时器的处理程序中安排工作流程(以便它不会添加到当前堆栈中)。就像是:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
    async {
        do! Async.Sleep 1
        let! x = Async.StartChild a
        let! y = Async.StartChild b
        do! x
        do! y }
Run Code Online (Sandbox Code Playgroud)

解决此问题的更好方法是创建您自己的Seq.reduce- 当前实现将其逐一折叠,这样您将获得一棵深度为 10000 的树,其中仅包含右侧的一个工作项以及所有其他工作项在左侧。如果您创建了工作项的平衡二叉树,那么它不应该堆栈溢出,因为高度只有 15 左右。

编辑Seq.reduce尝试用以下函数替换:

module Seq = 
  let reduceBallanced f input =
    let arr = input |> Array.ofSeq
    let rec reduce s t =
      if s + 1 >= t then arr.[s]
      else 
        let m = (s + t) / 2
        f (reduce s m) (reduce m t)
    reduce 0 arr.Length
Run Code Online (Sandbox Code Playgroud)