如何更改Rx Builder实现以修复堆栈溢出异常?

Hol*_*oed 11 f# computation-expression system.reactive

我正在尝试使用Rx Builder在F#Computation Expression语法中使用Reactive Extension.如何修复它以使它不会破坏堆栈?像下面的Seq示例一样.是否有任何计划提供RxBuilder的实现作为Reactive Extensions的一部分或作为.NET Framework未来版本的一部分?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore
Run Code Online (Sandbox Code Playgroud)

Tom*_*cek 9

简短的回答是Rx Framework不支持使用这样的递归模式生成observable,因此无法轻松完成.将Combine一个用于F#序列操作需要一些特殊的处理是观测不提供.Rx框架可能希望您Observable.Generate使用LINQ查询/ F#计算构建器来生成observable ,然后使用它们来处理它们.

无论如何,这里有一些想法 -

首先,您需要更换Observable.mergeObservable.Concat.第一个并行运行两个observable,而第二个首先从第一个observable生成所有值,然后从第二个observable生成值.在此更改之后,代码段将至少在堆栈溢出之前打印~800个数字.

堆栈溢出的原因是Concat创建一个可观察的调用Concat来创建另一个调用的observable Concat等.解决此问题的一种方法是添加一些同步.如果您使用的是Windows窗体,则可以进行修改,Delay以便在GUI线程上调度observable(丢弃当前堆栈).这是一个草图:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs
Run Code Online (Sandbox Code Playgroud)

要正确实现这一点,您必须编写自己的Concat方法,这非常复杂.这个想法是:

  • Concat返回一些特殊类型,例如 IConcatenatedObservable
  • 当递归调用该方法时,您将相互创建IConcatenatedObservable该引用的链
  • Concat方法将查找此链,并且当存在例如三个对象时,它将丢弃中间的一个(以始终保持长度链最多为2).

这对于StackOverflow答案来说有点过于复杂,但对于Rx团队来说这可能是一个有用的反馈.

  • 谢谢.很好的答案.我希望Rx团队正在倾听:) (2认同)

小智 9

请注意,这已在Rx v2.0中修复(如此处所述),更常见的是针对所有排序运算符(Concat,Catch,OnErrorResumeNext)以及命令运算符(If,While等).

基本上,您可以将此类运算符视为在终端观察器消息中订阅另一个序列(例如,Concat在接收到当前的OnCompleted消息时订阅下一个序列),这是尾递归类比的来源.

在Rx v2.0中,所有尾递归预订都被扁平化为类似队列的数据结构,用于一次处理一个,与下游观察者交谈.这避免了观察者在连续序列订阅中相互交谈的无限增长.