取消不同类型的Asyncs时行为不一致

stm*_*max 6 f#

取消不同类型的Asyncs时,我遇到了看似不一致的行为问题.

为了重现这个问题,让我们说有一个函数可以获取一个"作业"列表(Async <_>列表),等待它们完成并打印出它们的结果.该函数还会获取取消令牌,因此可以取消它:

let processJobs jobs cancel =
  Async.Start(async {
    try
      let! results = jobs |> Async.Parallel
      printfn "%A" results
    finally
      printfn "stopped"
  }, cancel)
Run Code Online (Sandbox Code Playgroud)

该函数被调用如下:

let jobs = [job1(); job2(); job3(); job4(); job5()]
use cancel = new CancellationTokenSource()

processJobs jobs cancel.Token
Run Code Online (Sandbox Code Playgroud)

稍后它会被取消:

Thread.Sleep(1000)
printfn "cancelling..."
cancel.Cancel()
Run Code Online (Sandbox Code Playgroud)

取消令牌源被取消后,该函数应执行finally块并打印"已停止".

这适用于job1,2和3,但是当列表中有job4或job5时不起作用.

Job1只是Async.Sleeps:

let job1() = async {
  do! Async.Sleep 1000000
  return 10
}
Run Code Online (Sandbox Code Playgroud)

Job2启动一些异步子进程并等待它们:

let job2() = async {
  let! child1 = Async.StartChild(async {
    do! Async.Sleep 1000000
    return 10
  })

  let! child2 = Async.StartChild(async {
    do! Async.Sleep 1000000
    return 10
  })

  let! results = [child1; child2] |> Async.Parallel
  return results |> Seq.sum
}
Run Code Online (Sandbox Code Playgroud)

Job3等待一些丑陋的等待句柄,这是由一些更丑陋的线程设置的:

let job3() = async {
  use doneevent = new ManualResetEvent(false)

  let thread = Thread(fun () -> Thread.Sleep(1000000); doneevent.Set() |> ignore)
  thread.Start()

  do! Async.AwaitWaitHandle(doneevent :> WaitHandle) |> Async.Ignore

  return 30
}
Run Code Online (Sandbox Code Playgroud)

Job4发布并等待来自MailboxProcessor的回复:

let job4() = async {
  let worker = MailboxProcessor.Start(fun inbox -> async {
    let! (msg:AsyncReplyChannel<int>) = inbox.Receive()
    do! Async.Sleep 1000000
    msg.Reply 40
  })

  return! worker.PostAndAsyncReply (fun reply -> reply) // <- cannot cancel this
}
Run Code Online (Sandbox Code Playgroud)

Job5等待Task(或TaskCompletionSource):

let job5() = async {
  let tcs = TaskCompletionSource<int>()

  Async.Start(async {
    do! Async.Sleep 1000000
    tcs.SetResult 50
  })

  return! (Async.AwaitTask tcs.Task) // <- cannot cancel this
}
Run Code Online (Sandbox Code Playgroud)

为什么可以取消Job1,2和3("停止"打印),而Job4和5使该功能"永久"挂起?

到目前为止,我总是依靠F#来处理幕后的取消 - 只要我在async-blocks并使用!s(让!,做!,返回!,......)一切都应该没问题......但是似乎并非一直如此.

Quote:

在F#异步工作流中,CancellationToken对象在封面下自动传递.这意味着我们不必做任何特殊的事情来支持取消.在运行异步工作流时,我们可以为其提供取消令牌,一切都会自动生效.

完整代码可在此处获取:http://codepad.org/euVO3xgP

编辑

我注意到通过Async.StartAsTask和Async.AwaitTask管道异步使得它在所有情况下都可以取消.

即对于Job4,这意味着更改线路:

return! worker.PostAndAsyncReply (fun reply -> reply)
Run Code Online (Sandbox Code Playgroud)

至:

return! cancelable <| worker.PostAndAsyncReply (fun reply -> reply)
Run Code Online (Sandbox Code Playgroud)

可取消的是:

let cancelable (x:Async<_>) = async {
  let! cancel = Async.CancellationToken
  return! Async.StartAsTask(x, cancellationToken = cancel) |> Async.AwaitTask
}
Run Code Online (Sandbox Code Playgroud)

同样可以使Job5取消.

但是......这只是一种解决方法,我几乎无法在每次调用时将其置于未知的异步<_>.

jyo*_*ung 1

只有异步。方法本身使用默认的 CancellationToken 进行处理。

在您的 MailboxProcessor 示例中,取消应该在 Start 方法上进行

let! ct= Async.CancellationToken
use worker := MailboxProcessor.Start( theWork, ct) 
Run Code Online (Sandbox Code Playgroud)

在 TaskCompletionSource 示例中,您必须注册一个回调来取消它。

let! ct = Async.CancellationToken
use canceler = ct.Register( fun () -> tcs.TrySetCanceled() |> ignore )
Run Code Online (Sandbox Code Playgroud)