取消不同类型的Asyncs时,我遇到了看似不一致的行为问题.
为了重现这个问题,让我们说有一个函数可以获取一个"作业"列表(Async <_>列表),等待它们完成并打印出它们的结果.该函数还会获取取消令牌,因此可以取消它:
let processJobs jobs cancel =
Async.Start(async {
try
let! results = jobs |> Async.Parallel
printfn "%A" results
finally
printfn "stopped"
}, cancel)
Run Code Online (Sandbox Code Playgroud)
该函数被调用如下:
let jobs = [job1(); job2(); job3(); job4(); job5()]
use cancel = new CancellationTokenSource()
processJobs jobs cancel.Token
Run Code Online (Sandbox Code Playgroud)
稍后它会被取消:
Thread.Sleep(1000)
printfn "cancelling..."
cancel.Cancel()
Run Code Online (Sandbox Code Playgroud)
取消令牌源被取消后,该函数应执行finally块并打印"已停止".
这适用于job1,2和3,但是当列表中有job4或job5时不起作用.
Job1只是Async.Sleeps:
let job1() = async {
do! Async.Sleep 1000000
return 10
}
Run Code Online (Sandbox Code Playgroud)
Job2启动一些异步子进程并等待它们:
let job2() = async {
let! child1 = Async.StartChild(async {
do! Async.Sleep 1000000
return 10
})
let! child2 = Async.StartChild(async {
do! Async.Sleep 1000000
return 10
})
let! results = [child1; child2] |> Async.Parallel
return results |> Seq.sum
}
Run Code Online (Sandbox Code Playgroud)
Job3等待一些丑陋的等待句柄,这是由一些更丑陋的线程设置的:
let job3() = async {
use doneevent = new ManualResetEvent(false)
let thread = Thread(fun () -> Thread.Sleep(1000000); doneevent.Set() |> ignore)
thread.Start()
do! Async.AwaitWaitHandle(doneevent :> WaitHandle) |> Async.Ignore
return 30
}
Run Code Online (Sandbox Code Playgroud)
Job4发布并等待来自MailboxProcessor的回复:
let job4() = async {
let worker = MailboxProcessor.Start(fun inbox -> async {
let! (msg:AsyncReplyChannel<int>) = inbox.Receive()
do! Async.Sleep 1000000
msg.Reply 40
})
return! worker.PostAndAsyncReply (fun reply -> reply) // <- cannot cancel this
}
Run Code Online (Sandbox Code Playgroud)
Job5等待Task(或TaskCompletionSource):
let job5() = async {
let tcs = TaskCompletionSource<int>()
Async.Start(async {
do! Async.Sleep 1000000
tcs.SetResult 50
})
return! (Async.AwaitTask tcs.Task) // <- cannot cancel this
}
Run Code Online (Sandbox Code Playgroud)
为什么可以取消Job1,2和3("停止"打印),而Job4和5使该功能"永久"挂起?
到目前为止,我总是依靠F#来处理幕后的取消 - 只要我在async-blocks并使用!s(让!,做!,返回!,......)一切都应该没问题......但是似乎并非一直如此.
在F#异步工作流中,CancellationToken对象在封面下自动传递.这意味着我们不必做任何特殊的事情来支持取消.在运行异步工作流时,我们可以为其提供取消令牌,一切都会自动生效.
完整代码可在此处获取:http://codepad.org/euVO3xgP
编辑
我注意到通过Async.StartAsTask和Async.AwaitTask管道异步使得它在所有情况下都可以取消.
即对于Job4,这意味着更改线路:
return! worker.PostAndAsyncReply (fun reply -> reply)
Run Code Online (Sandbox Code Playgroud)
至:
return! cancelable <| worker.PostAndAsyncReply (fun reply -> reply)
Run Code Online (Sandbox Code Playgroud)
可取消的是:
let cancelable (x:Async<_>) = async {
let! cancel = Async.CancellationToken
return! Async.StartAsTask(x, cancellationToken = cancel) |> Async.AwaitTask
}
Run Code Online (Sandbox Code Playgroud)
同样可以使Job5取消.
但是......这只是一种解决方法,我几乎无法在每次调用时将其置于未知的异步<_>.
只有异步。方法本身使用默认的 CancellationToken 进行处理。
在您的 MailboxProcessor 示例中,取消应该在 Start 方法上进行
let! ct= Async.CancellationToken
use worker := MailboxProcessor.Start( theWork, ct)
Run Code Online (Sandbox Code Playgroud)
在 TaskCompletionSource 示例中,您必须注册一个回调来取消它。
let! ct = Async.CancellationToken
use canceler = ct.Register( fun () -> tcs.TrySetCanceled() |> ignore )
Run Code Online (Sandbox Code Playgroud)