Cel*_*Cel 10 c# multithreading asynchronous system.reactive
使用Rx,我希望在以下代码中暂停和恢复功能:
static IDisposable _subscription;
static void Main(string[] args)
{
Subscribe();
Thread.Sleep(500);
// Second value should not be shown after two seconds:
Pause();
Thread.Sleep(5000);
// Continue and show second value and beyond now:
Resume();
}
static void Subscribe()
{
var list = new List<int> { 1, 2, 3, 4, 5 };
var obs = list.ToObservable();
_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Thread.Sleep(2000);
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
}
static void Pause()
{
// Pseudocode:
//_subscription.Pause();
}
static void Resume()
{
// Pseudocode:
//_subscription.Resume();
}
Run Code Online (Sandbox Code Playgroud)
我相信我可以使用某种布尔字段选通与线程锁定相结合(Monitor.Wait和Monitor.Pulse)
但是有没有一个Rx运算符或其他一些反应速记来达到同样的目的?
Eni*_*ity 12
这是一个相当简单的Rx方式来做你想要的.我创建了一个名为的扩展方法Pausable,它接受一个source observable和一个第二个可观察的boolean,它可以暂停或恢复observable.
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
Run Code Online (Sandbox Code Playgroud)
它可以像这样使用:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Run Code Online (Sandbox Code Playgroud)
使用Pause&Resume方法将它放在代码中应该相当容易.
gar*_*rik -1
它只是有效:
\n\n class SimpleWaitPulse\n {\n static readonly object _locker = new object();\n static bool _go;\n\n static void Main()\n { // The new thread will block\n new Thread (Work).Start(); // because _go==false.\n\n Console.ReadLine(); // Wait for user to hit Enter\n\n lock (_locker) // Let\'s now wake up the thread by\n { // setting _go=true and pulsing.\n _go = true;\n Monitor.Pulse (_locker);\n }\n }\n\n static void Work()\n {\n lock (_locker)\n while (!_go)\n Monitor.Wait (_locker); // Lock is released while we\xe2\x80\x99re waiting\n\n Console.WriteLine ("Woken!!!");\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n\n请参阅如何使用等待和脉冲了解更多详细信息
\n