f#mailboxprocessor - 无需等待传递即可回复

Kas*_*man 5 f# agent mailboxprocessor

我正在使用代理(MailboxProcessor)来进行需要响应的状态处理.

  • 呼叫者使用发布消息 MailboxProcessor.PostAndAsyncReply
  • 在代理内部,给出响应 AsyncReplyChannel.Reply

但是,我发现通过探索f#源代码,代理在响应传递之前不会处理下一条消息.总的来说这是件好事.但在我的情况下,代理更希望继续处理消息而不是等待响应传递.

做这样的事情以提供响应是否有问题?(或者有更好的选择吗?)

async { replyChannel.Reply response } |> Async.Start
Run Code Online (Sandbox Code Playgroud)

我意识到这种方法并不能保证响应按顺序传递.我没关系.

参考例子

// agent code
let doWork data =
    async { ... ; return response }

let rec loop ( inbox : MailboxProcessor<_> ) =
    async {
        let! msg = inbox.Receive()
        match msg with
        | None ->
            return ()

        | Some ( data, replyChannel ) ->
            let! response = doWork data
            replyChannel.Reply response (* waits for delivery, vs below *)
            // async { replyChannel.Reply response } |> Async.Start
            return! loop inbox
    }

let agent =
    MailboxProcessor.Start(loop)

// caller code
async {
    let! response =
        agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
    ...
}
Run Code Online (Sandbox Code Playgroud)

jbt*_*ule 1

FSharp.Control.AsyncSeq在邮箱处理器之上提供了一个更友好的界面。异步序列更容易理解,但是默认实现并行映射具有与所描述的相同的问题,等待序列中的前一个元素被映射以保留顺序。

因此,我创建了一个新函数,它只是原始的 AsyncSeq.mapAsyncParallel,经过修改,使其不再是真正的映射,因为它是无序的,但它确实映射了所有内容,并且惰性 seq 在工作完成时会取得进展。

AsyncSeq.mapAsyncParallelUnordered 的完整源代码

let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
  use mb = MailboxProcessor.Start (fun _ -> async.Return())
  let! err =
    s 
    |> AsyncSeq.iterAsyncParallel (fun a -> async {
      let! b = f a
      mb.Post (Some b) })
    |> Async.map (fun _ -> mb.Post None)
    |> Async.StartChildAsTask
  yield! 
    AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
  }
Run Code Online (Sandbox Code Playgroud)

下面是我如何在一个工具中使用它的示例,该工具使用 SSLlabs 免费且非常慢的 api,很容易过载。返回由 webapi 请求生成的parallelProcessHost惰性值,因此实际上运行请求并允许控制台在输入时打印结果,与发送的顺序无关。AsyncSeqAsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync

完整源码

let! es = 
    hosts
    |> Seq.indexed
    |> AsyncSeq.ofSeq
    |> AsyncSeq.map parallelProcessHost
    |> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
    |> AsyncSeq.indexed
    |> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
    |> AsyncSeq.collect AsyncSeq.ofSeq
    |> AsyncSeq.map stdoutOrStatus //Write out to console
    |> AsyncSeq.fold (|||) ErrorStatus.Okay
Run Code Online (Sandbox Code Playgroud)