在 F# 中一次强制执行一个 Async Observable

Tom*_*Tom 2 f# system.reactive async-await

我有一系列可观察的事物需要映射到 C# 任务。这些 C# 任务不应该同时运行,而是一个接一个地运行。基本上,我需要实现这个 C# 问题的 F# 等价物:

一次强制执行一个异步可观察对象

天真地翻译这个 C# 代码会得到如下内容:

let run (idx:int) (delay:int) =
    async {
        sprintf "start: %i (%i)" idx delay |> System.Diagnostics.Trace.WriteLine
        let! t = System.Threading.Tasks.Task.Delay(delay) |> Async.AwaitTask
        sprintf "finish: %i" idx  |> System.Diagnostics.Trace.WriteLine
        t
    }

    Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
    |> Observable.take 20
    |> Observable.mapi(fun idx delay -> idx, delay)
    |> Observable.bind(fun (idx, delay) -> Observable.ofAsync (run idx delay))
    |> Observable.subscribe ignore
    |> ignore
Run Code Online (Sandbox Code Playgroud)

这不能按预期工作,因为我不会在任何地方等待结果。有没有办法在 F# 中做到这一点而不阻塞线程,就像 C# 的 await 那样?

Stu*_*art 5

F# 中有一个方便的库,称为 AsyncSeq:https ://www.nuget.org/packages/FSharp.Control.AsyncSeq/

它与IAsyncEnumerable<T>添加到 C# 8.0 中的非常相似,这为您提供了一个很好的解决方案。

解决方案:

open System
open FSharp.Control
open FSharp.Control.Reactive

[<EntryPoint>]
let main _ =

    let run (idx:int) (delay:int) =
        async {
            sprintf "start: %i (%i)" idx delay |> Console.WriteLine
            do! Async.Sleep delay
            sprintf "finish: %i" idx  |> Console.WriteLine
        }

    Observable.generate (new Random()) (fun _ -> true) id (fun s -> s.Next(250, 500))
    |> Observable.take 20
    |> Observable.mapi(fun idx delay -> idx, delay)
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync (fun (idx,delay) -> run idx delay)
    |> Async.RunSynchronously

    0
Run Code Online (Sandbox Code Playgroud)

AsyncSeq.ofObservableBuffered完成订阅您的工作Observable并充当AsyncSeq您可以在其上进行管道传输的源。最后,我们调用Async.RunSynchronously将其启动并等待入口点线程。

注意:我也在run它返回时更新Async<Async<unit>>,我认为这不是故意的。