在RX中切换流:Sodium相当于RX中的合并和切换

jhe*_*dus -3 c# reactive-programming system.reactive sodium

如何能在电视频道的问题,因为在此解释的谈话在第31分钟的中通过RX解决?

Rx中表达的问题如下:

两个电视频道(channel1channel2)传输图像流,加上其中的流fuzz表示没有频道或白噪声.

有两个按钮可以发送事件eButton1eButton2按下它们.

按下这些按钮应该导致各个通道被发送到屏幕.

每个按钮按下应该被投影(映射)到相应的频道,然后所有频道组合成选择流作为以流开始的fuzz流的流.最后,交换机操作员将选定的流发送给screen.

什么相当于Sodiumswitch并在RX中合并?

是否有可能用纯高阶函数解决它?即不使用闭包?我不明白这是怎么可能的.

在此输入图像描述

Jam*_*rld 7

切换合并都存在于核心Rx库中,因此幻灯片中的代码实际上几乎逐字地将行转换为Rx.

Switch操作适用于流的数据流-中的Rx这是一个类型IObservable<IObservable<T>>.

交换机将这个流流变平,只将最新的流发送到它的输出,所以最终会得到一个IObservable<T>.

请参阅下面的C#示例.我尽可能地在谈话中重复使用变量名,所以这应该很容易理解.

唯一(非常轻微)不同的是hold函数被替换为Rx等价物StartWith.

包含nuget包Rx-Main并将其作为控制台应用程序运行.代码订阅screen流并开始将帧从"Fuzz"通道渲染到控制台.它会提示您输入频道号码.输入1或2,您将看到输出切换到相应通道的帧.

// helper method to create channels
private static IObservable<string> CreateChannelStream(
    string name, CompositeDisposable disposables)
{
    // this hacks together a demo channel stream -
    // a stream of "frames" for the channel
    // for simplicity rather than using images, I use a string
    // message for each frame
    // how it works isn't important, just know you'll get a
    // message event every second
    var channel = Observable.Interval(TimeSpan.FromSeconds(1))
                            .Select(x => name + " Frame: " + x)
                            .Publish();
    disposables.Add(channel.Connect());
    return channel;
}

public static void Main()
{       
    // for cleaning up the hot channel streams
    var disposable = new CompositeDisposable();

    // some channels
    var fuzz = CreateChannelStream("Fuzz", disposable);
    var channel1 = CreateChannelStream("Channel1", disposable);
    var channel2 = CreateChannelStream("Channel2", disposable);

    // the button press event streams
    var eButton1 = new Subject<Unit>();
    var eButton2 = new Subject<Unit>();

    // the button presses are projected to
    // the respective channel streams
    // note, you could obtain the channel via a function call here
    // if you wanted to - to keep it close to the slides I'm not.
    var eChan1 = eButton1.Select(_ => channel1);
    var eChan2 = eButton2.Select(_ => channel2);

    // create the selection "stream of streams"
    // an IObservable<IObservable<string>> here
    // that starts with "fuzz"
    var sel = Observable.Merge(eChan1, eChan2).StartWith(fuzz);

    // flatten and select the most recent stream with Switch
    var screen = sel.Switch();

    // subscribe to the screen and print the frames
    // it will start with "fuzz"
    disposable.Add(screen.Subscribe(Console.WriteLine));

    bool quit = false;

    // a little test loop
    // entering 1 or 2 will switch
    // to that channel
    while(!quit)
    {
        var chan = Console.ReadLine();
        switch (chan.ToUpper())
        {
            case "1":
                // raise a button 1 event
                eButton1.OnNext(Unit.Default);
                break;
            case "2":
                // raise a button 2 event
                eButton2.OnNext(Unit.Default);
                break;  
            case "Q":
                quit = true;
                break;                
        }
    }         

    disposable.Dispose();
}
Run Code Online (Sandbox Code Playgroud)