Mar*_*age 9 .net c# system.reactive
我正在尝试使用Reactive Extensions for .NET重写一些代码,但我需要一些关于如何实现目标的指导.
我有一个类在低级库中封装了一些异步行为.想想要读取或写入网络的东西.当类启动时,它将尝试连接到环境,并且当成功时,它将通过从工作线程调用来发回信号.
我想将此异步行为转换为同步调用,并在下面创建了一个极为简化的示例,说明如何实现:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
Run Code Online (Sandbox Code Playgroud)
AsyncStart在工作线程上运行只是模拟库的异步行为的一种方式,并不是我的实际代码的一部分,其中低级库提供线程并在回调上调用我的代码.
请注意,该Start方法将TimeoutException在超时间隔内抛出if start.
我想重写此代码以使用Rx.这是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Run Code Online (Sandbox Code Playgroud)
这是一次不错的尝试,但不幸的是它包含了竞争条件.如果启动快速完成(例如,如果delay为0)并且如果在A点处存在额外延迟,则在执行之前OnNext将调用.从本质上讲,我正在申请并且永远不会看到启动已经完成,而是会被抛出.readySubjectFirstIObservableTimeoutFirstTimeoutException
似乎Observable.Defer已经创建了处理这样的问题.这里使用Rx的尝试稍微复杂一些:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Run Code Online (Sandbox Code Playgroud)
现在,异步操作不是立即启动,而是仅在IObservable使用时启动.不幸的是,仍然存在竞争条件,但这次是在B点.如果异步操作OnNext在Deferlambda返回之前启动了调用,它仍然会丢失并且TimeoutException将抛出一个Timeout.
我知道我可以使用运算符Replay来缓冲事件,但我没有Rx的初始示例不使用任何类型的缓冲.有没有办法让我在没有竞争条件的情况下使用Rx来解决我的问题?本质上只有IObservable在这种情况下已连接到的情况下启动异步操作Timeout并且First?
根据Paul Betts的回答,这里有解决方案:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
Run Code Online (Sandbox Code Playgroud)
有趣的是,当C点的延迟比AsyncStart完成所需的时间长.AsyncSubject保留最后发送的通知,Timeout并且First仍将按预期执行.
Ana*_*tts 12
所以,有一点了解的Rx我想很多人在第一次(包括我自己!)做:如果你使用像ResetEvents,Thread.Sleeps,或其他任何传统的线程功能,你就错了(TM ) - 它就像在LINQ中将数据转换为Arrays,因为您知道底层类型恰好是一个数组.
要知道的关键是async func由一个返回的函数表示IObservable<TResult>- 这是一个神奇的酱汁,可以让你在完成任务时发出信号.所以这就是你如何"Rx-ify"一个更传统的异步函数,就像你在Silverlight Web服务中看到的那样:
IObservable<byte[]> readFromNetwork()
{
var ret = new AsyncSubject();
// Here's a traditional async function that you provide a callback to
asyncReaderFunc(theFile, buffer => {
ret.OnNext(buffer);
ret.OnCompleted();
});
return ret;
}
Run Code Online (Sandbox Code Playgroud)
这是一次不错的尝试,但不幸的是它包含了竞争条件.
这是AsyncSubject进来的地方 - 这确保即使asyncReaderFunc胜过订阅打卡,AsyncSubject仍将"重播"发生的事情.
所以,既然我们已经拥有了我们的功能,我们可以做很多有趣的事情:
// Make it into a sync function
byte[] results = readFromNetwork().First();
// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})
// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.Subscribe(ms => {
Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
});
// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.First();
Run Code Online (Sandbox Code Playgroud)