通过Rx从MailboxProcessor返回结果是一个好主意吗?

Dim*_*mka 5 f# system.reactive mailboxprocessor

我对下面的代码示例以及人们的想法有点好奇.我们的想法是从NetworkStream(~20 msg/s)读取而不是在main中工作,将事物传递给MainboxProcessor以便在完成时处理并返回绑定.

通常的方法是使用PostAndReply,但我想绑定到ListView或C#中的其他控件.必须使用LastN项目进行魔术并进行过滤.另外,Rx有一些错误处理.

下面的例子观察2..10的数字并返回"你好X".在8它停止像EOF.使其成为ToEnumerable,因为其他线程在完成之前完成,但它也适用于Subscribe.

困扰我的是什么:

  1. 在递归中传递Subject(obj).我认为其中大约有3-4个没有任何问题.好主意?
  2. 主体的一生.

open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq  // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency

type SerializedLogger() = 

    let _letters = new Subject<string>()
    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop (letters:Subject<string>) = async{

            // read a message
            let! msg = inbox.Receive()

            printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
            do! Async.Sleep 100
            // write it to the log    
            match msg with
            | 8 -> letters.OnCompleted() // like EOF.
            | x -> letters.OnNext(sprintf "hello %d" x)

            // loop to top
            return! messageLoop letters
            }

        // start the loop
        messageLoop _letters
        )

    // public interface
    member this.Log msg = agent.Post msg
    member this.Getletters() = _letters.AsObservable()

/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s,  Thread: %d" x  Thread.CurrentThread.ManagedThreadId

// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")

[<EntryPoint>]
let main argv = 
    async{
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId

    // test
    let logger = SerializedLogger()
    logger.Log 1 // ignored?

    let xObs = logger
                .Getletters() //.Where( fun x -> x <> "hello 5")
                .SubscribeOn(Scheduler.CurrentThread)
                .ObserveOn(Scheduler.CurrentThread)
                .ToEnumerable() // this
                //.Subscribe(onNext, onCompleted) // or with Dispose()

    [2..10] |> Seq.iter (logger.Log) 

    xObs |> Seq.iter myPrint1

    while true 
        do 
        printfn "waiting"
        System.Threading.Thread.Sleep(1000)

    return 0
    } |> Async.RunSynchronously // return an integer exit code
Run Code Online (Sandbox Code Playgroud)

Tom*_*cek 5

我做过类似的事情,但是使用普通的F#Event类型而不是Subject.它基本上允许您创建IObservable和触发其订阅 - 就像您使用更复杂的订阅一样Subject.基于事件的版本将是:

type SerializedLogger() = 
   let letterProduced = new Event<string>()
   let lettersEnded = new Event<unit>()
   let agent = MailboxProcessor.Start(fun inbox -> 
     let rec messageLoop (letters:Subject<string>) = async {
       // Some code omitted
       match msg with
       | 8 -> lettersEnded.Trigger()
       | x -> letterProduced.Trigger(sprintf "hello %d" x)
       // ...

member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish
Run Code Online (Sandbox Code Playgroud)

重要的区别是:

  • Event无法触发OnCompleted,所以我反而暴露了两个独立的事件.这非常不幸!鉴于这Subject与所有其他方面的事件非常相似,这可能是使用主题而不是普通事件的一个很好的理由.

  • 使用的好处Event是它是标准的F#类型,因此您不需要代理中的任何外部依赖项.

  • 我注意到你的评论注意到第一次调用Log被忽略了.那是因为您只在此调用发生后才订阅事件处理程序.我认为你可以这里使用主题思想的ReplaySubject变体 - 当你订阅它时它会重放所有事件,所以之前发生的事件不会丢失(但是缓存需要花费).

总而言之,我认为使用Subject可能是一个好主意 - 它基本上与使用相同的模式Event(我认为这是暴露来自代理的通知的非常标准的方式),但它允许您触发OnCompleted.我可能不会使用ReplaySubject,因为缓存成本 - 你必须确保在触发任何事件之前订阅.