使用Reactive Extensions(RX),是否可以添加"暂停"命令?

Con*_*ngo 5 .net c# reactive-programming system.reactive rx.net

我有一个类接收事件流,并推出另一个事件流.

所有事件都使用Reactive Extensions(RX).传入的事件流从外部源推送到IObserver<T>使用中.OnNext,并使用IObservable<T>和推出传出的事件流.Subscribe.我Subject<T>在幕后用来管理这个.

我想知道RX中有什么技术暂时暂停输出.这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出.

ion*_*noy 4

这是我使用缓冲区和窗口运算符的解决方案:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}
Run Code Online (Sandbox Code Playgroud)

窗口在订阅时打开,每次从暂停流收到“true”时。当暂停器提供“假”值时它会关闭。

Buffer 做了它应该做的事情,缓冲来自暂停器的“false”和“true”之间的值。一旦 Buffer 接收到“true”,它就会输出立即流式传输的值的 IList。

DotNetFiddle 链接: https: //dotnetfiddle.net/vGU5dJ