仅在满足特定条件时才会节流

Fla*_*ack 4 c# throttling system.reactive

我有一个观察我订阅的.这个obsevable将返回一个对象,该对象具有一个名为ActivationType的属性,可以多次设置.

我想要实现的是每当ActivationType设置为"Type1"时记录一条消息.但是,如果ActivationType设置为"Type2",则只记录消息一次并等待30秒,然后再次记录,如果ActivationType为"Type2".

所以,如果我有:

myObservable
    .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2")  //listen for types 1 and 2
    .Throttle() // ??? somehow only throttle if we are currently looking at Type2
    .Subscribe(Log); //log some stuff
Run Code Online (Sandbox Code Playgroud)

我相信Throttle()是我正在寻找的,但我不确定如何有条件地触发它.

有什么建议?

Jer*_*all 6

啊,对于几乎不可能理解的Window操作员来说,这是一个完美的案例!

编辑:我喜欢张贴了十几次了一个月,我发誓,这个链接-最好读通我见过的Window,Join,Buffer,GroupJoin,等运营商:

Lee Campbell:Rx第9部分 - 加入,窗口,缓冲和组加入

var source = new Subject<Thing>();

var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    // only window the type2s
    .Where(t => t.ActivationType == "Type2")
    // our "end window selector" will be a tick 30s off from start
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(30)))
    // we want the first one in each window...
    .Select(lst => lst.Take(1))
    // moosh them all back together
    .Merge();

    // We want all "type 1s" and the buffered outputs of "type 2s"
    var query = ofType1.Merge(ofType2);

    // Let's set up a fake stream of data
    var running = true;
    var feeder = Task.Factory.StartNew(
       () => { 
         // until we say stop...
         while(running) 
         { 
             // pump new Things into the stream every 500ms
             source.OnNext(new Thing()); 
             Thread.Sleep(500); 
         }
    });

    using(query.Subscribe(Console.WriteLine))
    {               
        // Block until we hit enter so we can see the live output 
        // from the above subscribe 
        Console.ReadLine();
        // Shutdown our fake feeder
        running = false;
        feeder.Wait();
     }
Run Code Online (Sandbox Code Playgroud)