Jac*_*eja 1 .net system.reactive
我有一个异步WCF服务,它采用"URI"并返回一个图像(作为流).
我想要做的是:
到目前为止,我已经想出了这个怪物:
private void PollImage(string imageUri)
{
const int pollingHertz = 1;
const int millisecondsTimeout = 1000 / pollingHertz;
Thread.Sleep(millisecondsTimeout);
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc = Observable.FromAsyncPattern<string, Stream>
(_channel.BeginGetImage, _channel.EndGetImage);
getImageFunc(imageUri)
.Finally(() => PollImage(imageUri))
.Subscribe(
stream => UpdateImageStream(imageUri, stream),
ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject) _channel).CloseOrAbort();
_channel = null;
});
}
Run Code Online (Sandbox Code Playgroud)
我真的很想学习Rx但是每次尝试都会让我不知所措.
有人愿意给我一些指示吗?谢谢
我有一个解决方案,但我建议改变你的PollImage方法,使其更像Rx.
签名应如下所示:
IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
Run Code Online (Sandbox Code Playgroud)
您应该考虑PollImage成为一个可观察的工厂,在您订阅返回的observable之前,它实际上不会轮询图像.这种方法的优点是它可以取消订阅 - 你的最后一个要点需要这个 - 并且它干净地分离了轮询图像的代码和更新局部变量的代码.
那么,对PollImage当时的调用看起来像这样:
PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
.Subscribe(image =>
{
/* do save/update images here */
});
Run Code Online (Sandbox Code Playgroud)
实现看起来像这样:
private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
Func<Stream, Image> getImageFromStream = st =>
{
/* read image from stream here */
};
return Observable.Create<Image>(o =>
{
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc =
Observable
.FromAsyncPattern<string, Stream>(
_channel.BeginGetImage,
_channel.EndGetImage);
var query =
from ts in Observable.Timer(gapInterval)
from stream in getImageFunc(imageUri)
from img in Observable.Using(
() => stream,
st => Observable.Start(
() => getImageFromStream(st)))
select img;
return query.Do(img => { }, ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject)_channel).CloseOrAbort();
_channel = null;
}).Repeat().Retry().Subscribe(o);
});
}
Run Code Online (Sandbox Code Playgroud)
该query观察的等待,直到gapInterval完成,然后调用WCF函数返回流,然后将数据流转换为图像.
内部return陈述做了很多事情.
首先,它使用Do运算符捕获发生的任何异常,并像以前一样进行跟踪和通道重置.
接下来,它调用.Repeat()以使查询有效地重新运行,使其gapInterval在再次调用webservice之前等待.我本来可以使用Observable.Interval而不是Observable.Timerin query和drop来调用.Repeat(),但是这意味着对webservice的调用每次启动gapInterval而不是在上次完成之后等待很长时间.
接下来,.Retry()如果遇到异常,则调用有效地重新启动observable,以便订阅者永远不会看到异常.该Do运营商抓住了错误,所以这是确定.
最后,它订阅观察者并返回IDisposable允许调用代码取消订阅.
除了实现这个getImageFromStream功能之外,就是这个.
现在谨慎一点.很多人误解了订阅观察者的工作方式,这可能导致很难发现错误.
以此为例:
var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));
var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });
Run Code Online (Sandbox Code Playgroud)
这两个s1与s2订阅xs,但不是分享他们各自创建一个定时器一个计时器.您有两个Observable.Interval创建的内部工作实例,而不是一个.
现在这是observables的正确行为.如果一个失败,那么另一个不会因为他们不共享任何内部 - 他们彼此隔离.
但是,在您的代码中(以及我的代码),您可能存在潜在的线程问题,因为您共享_channel多个调用PollImage.如果一个呼叫失败,它将重置该信道,这可能导致并发呼叫因此失败.
我的建议是,为每个调用创建一个新通道,以防止出现并发问题.
我希望这有帮助.