基于无限序列的Observable动画图表

Mar*_*ann 9 f# system.reactive fsharpchart

我在使用FSharp.ChartingFSharp.Control.Reactive创建我的一些数据的动画可视化时遇到了麻烦.

基本上,我有一个无限的点生成器.我的实际生成器比下面的示例更复杂,但是这个简化的示例再现了这个问题:

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))
Run Code Online (Sandbox Code Playgroud)

move功能具有类型(int * int) list -> seq<(int * int) list>.对于每次迭代,它将输入列表中的所有点都转换为1,以使点向上和向右移动.

我想在这上面展示一下LiveChart.Point.生成的序列move可以转换为适当的Observable,但是它本身运行得非常快,所以我先把它慢一点:

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }
Run Code Online (Sandbox Code Playgroud)

这使我能够从点序列中创建一个Observable:

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
Run Code Online (Sandbox Code Playgroud)

我还可以看到,如果我打印到控制台,它的工作原理:

obs.Subscribe (fun x -> printfn "%O" x)
Run Code Online (Sandbox Code Playgroud)

通过打印到控制台,很明显这会阻止执行环境; 例如,如果您将脚本发送到F#Interactive(FSI),它将继续打印,您将不得不取消评估或重置会话以便停止它.

我的理论是因为它obs与执行环境在同一个线程上运行.

如果我尝试用LiveChart.Point它来做同样的事情:

let lc = obs |> LiveChart.Point
lc.ShowChart()
Run Code Online (Sandbox Code Playgroud)

如果我发送给FSI,没有任何反应(没有显示图表)和FSI阻止.

这似乎与我的理论一致,即观察者与图表在同一个线程上运行.

如何让Observer在不同的线程上运行?

我发现Observable.observeOn,这需要一个IScheduler.浏览MSDN,我发现了NewThreadScheduler,ThreadPoolSchedulerTaskPoolScheduler,所有这些都实现了IScheduler.这些类的名称听起来很有希望,但我找不到它们!

根据文档,它们都是定义的System.Reactive.dll,但是虽然我拥有FSharp.Control.Reactive的所有依赖,但我没有任何组件.搜索互联网还没有透露在哪里获得它.

它是Reactive Extensions的旧版本还是更新版本?我是否正朝着正确的方向前进?

如何可视化我的无限点序列LiveChart


这是一个重现问题的完整脚本:

#r @"../packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"../packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"../packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"../packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"../packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

//obs.Subscribe (fun x -> printfn "%O" x)

let lc = obs |> LiveChart.Point
lc.ShowChart()
Run Code Online (Sandbox Code Playgroud)

已安装的NuGet包:

Id                                  Versions
--                                  --------
FSharp.Charting                     {0.90.12}
FSharp.Control.Reactive             {3.2.0}
FSharp.Core                         {3.1.2}
Rx-Core                             {2.2.5}
Rx-Interfaces                       {2.2.5}
Rx-Linq                             {2.2.5}
Run Code Online (Sandbox Code Playgroud)

Kev*_*vin 6

诀窍是使用subscribeOn.来自introtorx.com上的subscribeOn和observeOn:

我想在这里指出的一个陷阱是,我最初几次使用这些重载,我对他们实际做的事感到困惑.您应该使用SubscribeOn方法来描述您希望如何安排任何预热和后台处理代码.例如,如果要将SubscribeOn与Observable.Create一起使用,则传递给Create方法的委托将在指定的调度程序上运行.

ObserveOn方法用于声明您希望将通知安排到的位置.我建议使用ObserveOn方法在使用STA系统时最有用,最常见的是UI应用程序.

完整脚本如下:

#r @"packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"packages/Rx-PlatformServices.2.2.5/lib/net45/System.Reactive.PlatformServices.dll"
#r @"packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting
open System.Reactive.Concurrency

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
    |> Observable.subscribeOn NewThreadScheduler.Default 

let lc = obs |> LiveChart.Point
lc.ShowChart()
Run Code Online (Sandbox Code Playgroud)

通常在UI应用程序中,您可以将subscribeOn和observeOn配对,以确保在UI线程上传回结果.这里似乎不需要,因为它看起来像图表处理这个(适合我).