ste*_*tej 5 concurrency f# messages mailboxprocessor
我试图找到一个关于如何使用的例子TryScan,但没有找到任何,你能帮助我吗?
我想做什么(非常简单的例子):我有一个MailboxProcessor接受两种类型的消息.
第一个GetState返回当前状态.
GetState消息经常发送
另一个UpdateState非常昂贵(耗时) - 例如从互联网下载某些内容然后相应地更新状态.
UpdateState被称为很少.
我的问题是 - 消息GetState被阻止并等到前面UpdateState服务.这就是为什么我试图用来TryScan处理所有GetState消息,但没有运气.
我的示例代码:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
mbox.TryScan(fun m ->
match m with
| GetState(chnl) ->
printfn "G processing TryScan"
chnl.Reply(state)
Some(async { return! loop state})
| _ -> None
) |> ignore
let! msg = mbox.Receive()
match msg with
| UpdateState ->
printfn "U processing"
// something very time consuming here...
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| GetState(chnl) ->
printfn "G processing"
chnl.Reply(state)
return! loop state
}
loop 0
)
[async { for i in 1..10 do
printfn " U"
mbox.Post(UpdateState)
async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
async { do! Async.Sleep(500) } |> Async.RunSynchronously
for i in 1..20 do
printfn "G"
printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously
Run Code Online (Sandbox Code Playgroud)
如果您尝试运行代码,您将看到,该GetState消息几乎不会被处理,因为它等待结果.另一方面UpdateState,只是一劳永逸,因此有效地阻止了国家.
编辑
目前适用于我的解决方案是这样的:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
let! res = mbox.TryScan((function
| GetState(chnl) -> Some(async {
chnl.Reply(state)
return state
})
| _ -> None
), 5)
match res with
| None ->
let! msg = mbox.Receive()
match msg with
| UpdateState ->
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| _ -> return! loop state
| Some n -> return! loop n
}
loop 0
)
Run Code Online (Sandbox Code Playgroud)
对评论的反应:与其他MailboxProcessor或并行ThreadPool执行的想法UpdateState很好,但我目前不需要它.我想做的就是处理所有GetState消息,然后处理其他消息.我不关心在处理过程UpdateState中代理是否被阻止.
我会告诉你输出的问题是什么:
// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
U 200ms <-- issue UpdateState
U processing <-- process UpdateState, it takes 1sec, so other
U 200ms 5 requests are sent; sent means, that it is
U 200ms fire-and-forget message - it doesn't wait for any result
and therefore it can send every 200ms one UpdateState message
G <-- first GetState sent, but waiting for reply - so all
previous UpdateState messages have to be processed! = 3 seconds
and AFTER all the UpdateState messages are processed, result
is returned and new GetState can be sent.
U 200ms
U 200ms because each UpdateState takes 1 second
U 200ms
U processing
U
U
U
U
U processing
G processing <-- now first GetState is processed! so late? uh..
U processing <-- takes 1sec
3
G
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
G processing <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because
// there is no UpdateState message in the queue, neither it is sent
Run Code Online (Sandbox Code Playgroud)
我认为该TryScan方法在这种情况下不会对您有帮助。它允许您指定等待消息时使用的超时。一旦收到消息,它将开始处理该消息(忽略超时)。
例如,如果您想等待某些特定消息,但每秒(在等待时)执行一些其他检查,您可以编写:
let loop () = async {
let! res = mbox.TryScan(function
| ImportantMessage -> Some(async {
// process message
return 0
})
| _ -> None)
match res with
| None ->
// perform some check & continue waiting
return! loop ()
| Some n ->
// ImportantMessage was received and processed
}
Run Code Online (Sandbox Code Playgroud)
在处理邮件时,如何避免阻塞邮箱处理器UpdateState?邮箱处理器(逻辑上)是单线程的 - 您可能不想取消UpdateState消息的处理,因此最好的选择是开始在后台处理它并等待处理完成。然后,处理的代码UpdateState可以将一些消息发送回邮箱(例如UpdateStateCompleted)。
这是一个草图:
let rec loop (state) = async {
let! msg = mbox.Receive()
match msg with
| GetState(repl) ->
repl.Reply(state)
return! scanning state
| UpdateState ->
async {
// complex calculation (runs in parallel)
mbox.Post(UpdateStateCompleted newState) }
|> Async.Start
| UpdateStateCompleted newState ->
// Received new state from background workflow
return! loop newState }
Run Code Online (Sandbox Code Playgroud)
现在后台任务是并行运行的,您需要注意可变状态。另外,如果您发送UpdateState消息的速度比处理消息的速度快,您就会遇到麻烦。例如,当您已经处理前一个请求时,可以通过忽略或排队请求来解决此问题。
| 归档时间: |
|
| 查看次数: |
723 次 |
| 最近记录: |