在冷IObservable上暂停和恢复订阅

Cel*_*Cel 10 c# multithreading asynchronous system.reactive

使用Rx,我希望在以下代码中暂停和恢复功能:

如何实现Pause()和Resume()?

    static IDisposable _subscription;

    static void Main(string[] args)
    {
        Subscribe();
        Thread.Sleep(500);
        // Second value should not be shown after two seconds:
        Pause();
        Thread.Sleep(5000);
        // Continue and show second value and beyond now:
        Resume();
    }

    static void Subscribe()
    {
        var list = new List<int> { 1, 2, 3, 4, 5 };
        var obs = list.ToObservable();
        _subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Thread.Sleep(2000);
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
    }

    static void Pause()
    {
        // Pseudocode:
        //_subscription.Pause();
    }

    static void Resume()
    {
        // Pseudocode:
        //_subscription.Resume();
    }
Run Code Online (Sandbox Code Playgroud)

Rx解决方案?

  • 我相信我可以使用某种布尔字段选通与线程锁定相结合(Monitor.WaitMonitor.Pulse)

  • 但是有没有一个Rx运算符或其他一些反应速记来达到同样的目的?

Eni*_*ity 12

这是一个相当简单的Rx方式来做你想要的.我创建了一个名为的扩展方法Pausable,它接受一个source observable和一个第二个可观察的boolean,它可以暂停或恢复observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}
Run Code Online (Sandbox Code Playgroud)

它可以像这样使用:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Run Code Online (Sandbox Code Playgroud)

使用Pause&Resume方法将它放在代码中应该相当容易.


gar*_*rik -1

它只是有效:

\n\n
    class SimpleWaitPulse\n    {\n      static readonly object _locker = new object();\n      static bool _go;\n\n      static void Main()\n      {                                // The new thread will block\n        new Thread (Work).Start();     // because _go==false.\n\n        Console.ReadLine();            // Wait for user to hit Enter\n\n        lock (_locker)                 // Let\'s now wake up the thread by\n        {                              // setting _go=true and pulsing.\n          _go = true;\n          Monitor.Pulse (_locker);\n        }\n      }\n\n      static void Work()\n      {\n        lock (_locker)\n          while (!_go)\n            Monitor.Wait (_locker);    // Lock is released while we\xe2\x80\x99re waiting\n\n        Console.WriteLine ("Woken!!!");\n      }\n    }\n
Run Code Online (Sandbox Code Playgroud)\n\n

请参阅如何使用等待和脉冲了解更多详细信息

\n