如何正确使用F#中的TryScan

ste*_*tej 5 concurrency f# messages mailboxprocessor

我试图找到一个关于如何使用的例子TryScan,但没有找到任何,你能帮助我吗?

我想做什么(非常简单的例子):我有一个MailboxProcessor接受两种类型的消息.

  • 第一个GetState返回当前状态. GetState消息经常发送

  • 另一个UpdateState非常昂贵(耗时) - 例如从互联网下载某些内容然后相应地更新状态. UpdateState被称为很少.

我的问题是 - 消息GetState被阻止并等到前面UpdateState服务.这就是为什么我试图用来TryScan处理所有GetState消息,但没有运气.

我的示例代码:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously
Run Code Online (Sandbox Code Playgroud)

如果您尝试运行代码,您将看到,该GetState消息几乎不会被处理,因为它等待结果.另一方面UpdateState,只是一劳永逸,因此有效地阻止了国家.

编辑

目前适用于我的解决方案是这样的:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)
Run Code Online (Sandbox Code Playgroud)

对评论的反应:与其他MailboxProcessor或并行ThreadPool执行的想法UpdateState很好,但我目前不需要它.我想做的就是处理所有GetState消息,然后处理其他消息.我不关心在处理过程UpdateState中代理是否被阻止.

我会告诉你输出的问题是什么:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent
Run Code Online (Sandbox Code Playgroud)

Tom*_*cek 4

我认为该TryScan方法在这种情况下不会对您有帮助。它允许您指定等待消息时使用的超时。一旦收到消息,它将开始处理该消息(忽略超时)。

例如,如果您想等待某些特定消息,但每秒(在等待时)执行一些其他检查,您可以编写:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}
Run Code Online (Sandbox Code Playgroud)

在处理邮件时,如何避免阻塞邮箱处理器UpdateState?邮箱处理器(逻辑上)是单线程的 - 您可能不想取消UpdateState消息的处理,因此最好的选择是开始在后台处理它并等待处理完成。然后,处理的代码UpdateState可以将一些消息发送回邮箱(例如UpdateStateCompleted)。

这是一个草图:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }
Run Code Online (Sandbox Code Playgroud)

现在后台任务是并行运行的,您需要注意可变状态。另外,如果您发送UpdateState消息的速度比处理消息的速度快,您就会遇到麻烦。例如,当您已经处理前一个请求时,可以通过忽略或排队请求来解决此问题。