F#Rx扩展具有并发事件的IObservable

vid*_*idi 2 f# multithreading reactive-programming system.reactive

我有以下使用FSharp.Reactive的F#代码.函数reactToEvents接受两个事件源并生成一个应用程序状态源.然后UI使用应用程序状态

open FSharp.Reactive

let reactToEvents (initialSettings:Settings) 
            (incomingTweets: ITweet IObservable) 
            (uiActions: UserInput IObservable) :  State IObservable = 
    let initialState = { settings = initialSettings; tweets = [] }

    let tweetInput = Observable.map TweetReceived incomingTweets 
    let userInput = Observable.map UserAction uiActions
    let allInput = Observable.merge userInput tweetInput

    Observable.scan transition initialState allInput
Run Code Online (Sandbox Code Playgroud)

在上面的函数中,在UI线程上生成uiActions时,在一个线程上生成incomingTweets事件.

像我上面那样使用Observable.merge合并两个源是否安全?

使用上面的Observable.scan扫描生成的源是否安全?

如果这不正确,那么正确的方法是什么?

谢谢!

更新1:

我希望至少Observable.merge不关心线程.我发现了这个,似乎说它们使用起来并不安全:

http://msdn.microsoft.com/en-us/library/ee353488.aspx

http://msdn.microsoft.com/en-us/library/ee353749.aspx

"对于每个观察者来说,注册的中间观察对象不是线程安全的.也就是说,源不会在不同的线程上同时触发源的观察."

那么正确的方法是什么呢?

更新2:

这是一个混乱.我链接的文档是用于从名称空间Microsoft.FSharp.Control.Observable合并和扫描的函数.这与我在我的代码中使用的Rx不同.对于真正的Rx,您需要使用库FSharp.Reactive和FSharp.Reactive.Observable下的函数.

Ana*_*tts 5

像我上面那样使用Observable.merge合并两个源是否安全?使用上面的Observable.scan扫描生成的源是否安全?

是的,Rx保证这是安全的.但是,要意识到结果会出现在不同的线程上.如果您希望随后使用结果更新UI元素,则需要通过ObserveOn(或F#等效项)将结果封送到UI线程

  • 有一份关于"Rx合同"的文件,但我无法找到它.Rx的核心思想是它为您提供了一个理智的并发抽象 - 您不必担心在操作员级别管理并发访问,操作员自己也会努力保证这是安全的.一个例外是Subjects,它不能同时OnNext'd(你需要一个锁) (2认同)