使用Rx阻止(并可能超时)异步操作

Mar*_*age 9 .net c# system.reactive

我正在尝试使用Reactive Extensions for .NET重写一些代码,但我需要一些关于如何实现目标的指导.

我有一个类在低级库中封装了一些异步行为.想想要读取或写入网络的东西.当类启动时,它将尝试连接到环境,并且当成功时,它将通过从工作线程调用来发回信号.

我想将此异步行为转换为同步调用,并在下面创建了一个极为简化的示例,说明如何实现:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}
Run Code Online (Sandbox Code Playgroud)

AsyncStart在工作线程上运行只是模拟库的异步行为的一种方式,并不是我的实际代码的一部分,其中低级库提供线程并在回调上调用我的代码.

请注意,该Start方法将TimeoutException在超时间隔内抛出if start.

我想重写此代码以使用Rx.这是我的第一次尝试:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}
Run Code Online (Sandbox Code Playgroud)

这是一次不错的尝试,但不幸的是它包含了竞争条件.如果启动快速完成(例如,如果delay为0)并且如果在A点处存在额外延迟,则在执行之前OnNext将调用.从本质上讲,我正在申请并且永远不会看到启动已经完成,而是会被抛出.readySubjectFirstIObservableTimeoutFirstTimeoutException

似乎Observable.Defer已经创建了处理这样的问题.这里使用Rx的尝试稍微复杂一些:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}
Run Code Online (Sandbox Code Playgroud)

现在,异步操作不是立即启动,而是仅在IObservable使用时启动.不幸的是,仍然存在竞争条件,但这次是在B点.如果异步操作OnNextDeferlambda返回之前启动了调用,它仍然会丢失并且TimeoutException将抛出一个Timeout.

我知道我可以使用运算符Replay来缓冲事件,但我没有Rx的初始示例不使用任何类型的缓冲.有没有办法让我在没有竞争条件的情况下使用Rx来解决我的问题?本质上只有IObservable在这种情况下已连接到的情况下启动异步操作Timeout并且First


根据Paul Betts的回答,这里有解决方案:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}
Run Code Online (Sandbox Code Playgroud)

有趣的是,当C点的延迟比AsyncStart完成所需的时间长.AsyncSubject保留最后发送的通知,Timeout并且First仍将按预期执行.

Ana*_*tts 12

所以,有一点了解的Rx我想很多人在第一次(包括我自己!)做:如果你使用像ResetEvents,Thread.Sleeps,或其他任何传统的线程功能,你就错了(TM ) - 它就像在LINQ中将数据转换为Arrays,因为您知道底层类型恰好是一个数组.

要知道的关键是async func由一个返回的函数表示IObservable<TResult>- 这是一个神奇的酱汁,可以让你在完成任务时发出信号.所以这就是你如何"Rx-ify"一个更传统的异步函数,就像你在Silverlight Web服务中看到的那样:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}
Run Code Online (Sandbox Code Playgroud)

这是一次不错的尝试,但不幸的是它包含了竞争条件.

这是AsyncSubject进来的地方 - 这确保即使asyncReaderFunc胜过订阅打卡,AsyncSubject仍将"重播"发生的事情.

所以,既然我们已经拥有了我们的功能,我们可以做很多有趣的事情:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();
Run Code Online (Sandbox Code Playgroud)