Hol*_*oed 11 f# computation-expression system.reactive
我正在尝试使用Rx Builder在F#Computation Expression语法中使用Reactive Extension.如何修复它以使它不会破坏堆栈?像下面的Seq示例一样.是否有任何计划提供RxBuilder的实现作为Reactive Extensions的一部分或作为.NET Framework未来版本的一部分?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
Run Code Online (Sandbox Code Playgroud)
简短的回答是Rx Framework不支持使用这样的递归模式生成observable,因此无法轻松完成.将Combine
一个用于F#序列操作需要一些特殊的处理是观测不提供.Rx框架可能希望您Observable.Generate
使用LINQ查询/ F#计算构建器来生成observable ,然后使用它们来处理它们.
无论如何,这里有一些想法 -
首先,您需要更换Observable.merge
有Observable.Concat
.第一个并行运行两个observable,而第二个首先从第一个observable生成所有值,然后从第二个observable生成值.在此更改之后,代码段将至少在堆栈溢出之前打印~800个数字.
堆栈溢出的原因是Concat
创建一个可观察的调用Concat
来创建另一个调用的observable Concat
等.解决此问题的一种方法是添加一些同步.如果您使用的是Windows窗体,则可以进行修改,Delay
以便在GUI线程上调度observable(丢弃当前堆栈).这是一个草图:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
Run Code Online (Sandbox Code Playgroud)
要正确实现这一点,您必须编写自己的Concat
方法,这非常复杂.这个想法是:
IConcatenatedObservable
IConcatenatedObservable
该引用的链Concat
方法将查找此链,并且当存在例如三个对象时,它将丢弃中间的一个(以始终保持长度链最多为2).这对于StackOverflow答案来说有点过于复杂,但对于Rx团队来说这可能是一个有用的反馈.
小智 9
请注意,这已在Rx v2.0中修复(如此处所述),更常见的是针对所有排序运算符(Concat,Catch,OnErrorResumeNext)以及命令运算符(If,While等).
基本上,您可以将此类运算符视为在终端观察器消息中订阅另一个序列(例如,Concat在接收到当前的OnCompleted消息时订阅下一个序列),这是尾递归类比的来源.
在Rx v2.0中,所有尾递归预订都被扁平化为类似队列的数据结构,用于一次处理一个,与下游观察者交谈.这避免了观察者在连续序列订阅中相互交谈的无限增长.