如何创建一个 Observable Timer 来调用一个方法,并且如果该方法正在运行直到完成则在取消时阻塞?

Mic*_*haC 3 c# system.reactive

我的要求:

  1. 按指定的时间间隔运行方法 DoWork。
  2. 如果在调用 DoWork 之间调用 stop,则只需停止计时器即可。
  3. 如果在 DoWork 运行时调用 stop,则阻塞直到 DoWork 完成。
  4. 如果调用 stop 后 DoWork 花费太长时间才能完成,则超时。

我有一个到目前为止似乎有效的解决方案,但我对此不太满意,并且认为我可能错过了一些东西。以下是我的测试应用程序中的 void Main:

var source = new CancellationTokenSource();

// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
    source.Token.Register(() =>
    {
        Console.WriteLine("Start on canceled handler.");
        o.OnNext(1);
        Console.WriteLine("End on canceled handler.");
    });

    return Disposable.Empty;
});

var observable =
    // Create observable timer.
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
        // Merge with the cancel observable so we have a composite that 
        // generates an event every 10 seconds AND immediately when a cancel is requested.
        .Merge(cancelObservable)
        // This is what I ended up doing instead of disposing the timer so that I could wait
        // for the sequence to finish, including DoWork.
        .TakeWhile(i => !source.IsCancellationRequested)
        // I could put this in an observer, but this way exceptions could be caught and handled
        // or the results of the work could be fed to a subscriber.
        .Do(l =>
        {
            Console.WriteLine("Start DoWork.");
            Thread.Sleep(TimeSpan.FromSeconds(5));
            Console.WriteLine("Finish DoWork.");
        });

var published = observable.Publish();

var disposable = published.Connect();

// Press key between Start DoWork and Finish DoWork to test the cancellation while
// running DoWork.
// Press key between Finish DoWork and Start DoWork to test cancellation between
// events.
Console.ReadKey();

// I doubt this is good practice, but I was finding that o.OnNext was blocking
// inside of register, and the timeout wouldn't work if I blocked here before
// I set it up.
Task.Factory.StartNew(source.Cancel);

// Is there a preferred way to block until a sequence is finished? My experience
// is there's a timing issue if Cancel finishes fast enough the sequence may already
// be finished by the time I get here and .Wait() complains that the sequence contains
// no elements.
published.Timeout(TimeSpan.FromSeconds(1))
    .ForEach(i => { });

disposable.Dispose();

Console.WriteLine("All finished! Press any key to continue.");
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)

Bra*_*don 5

首先,在您的 中cancelObservable,确保并将 的结果Token.Register作为您的一次性产品返回,而不是返回Disposable.Empty

这是一个很好的转换为可CancellationTokens观察量的扩展方法:

public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler)
{
    return Observable.Create<Unit>(observer =>
    {
        var d1 = new SingleAssignmentDisposable();
        return new CompositeDisposable(d1, token.Register(() =>
            {
                d1.Disposable = scheduler.Schedule(() =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
            }));
    });
}
Run Code Online (Sandbox Code Playgroud)

现在,根据您的实际要求:

public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal)
{
    // Performs work on an interval
    // stops the timer (but finishes any work in progress) when the cancelSignal is received
    var workTimer = Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
        .TakeUntil(cancelSignal)
        .Select(_ =>
        {
            DoWork();
            return Unit.Default;
        })
        .IgnoreElements();

    // starts a timer after cancellation that will eventually throw a timeout exception.
    var timeoutAfterCancelSignal = cancelSignal
        .SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5)));

    // Use Amb to listen to both the workTimer
    // and the timeoutAfterCancelSignal
    // Since neither produce any data we are really just
    // listening to see which will complete first.
    // if the workTimer completes before the timeout
    // then Amb will complete without error.
    // However if the timeout expires first, then Amb
    // will produce an error
    return Observable.Amb(workTimer, timeoutAfterCancelSignal);
}

// Usage
var cts = new CancellationTokenSource();
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default));

using (var finishedSignal = new ManualResetSlim())
{
    s.Finally(finishedSignal.Set).Subscribe(
        _ => { /* will never be called */},
        error => { /* handle error */ },
        () => { /* canceled without error */ } );

    Console.ReadKey();
    cts.Cancel();

    finishedSignal.Wait();
}
Run Code Online (Sandbox Code Playgroud)

请注意,除了取消令牌,您还可以执行以下操作:

var cancelSignal = new AsyncSubject<Unit>();
var s = ScheduleWork(cancelSignal);

// .. to cancel ..
Console.ReadKey();
cancelSignal.OnNext(Unit.Default);
cancelSignal.OnCompleted();
Run Code Online (Sandbox Code Playgroud)

  • 我不会将“ObserveOn”嵌入到“TakeUntil”调用中,而是通过修改“ToObservable”辅助方法来接受“IScheduler”,以便在以下情况下发出取消信号:令牌被取消。我已经修改了答案。 (2认同)