如何使用Rx通过异步WCF服务轮询图像

Jac*_*eja 1 .net system.reactive

我有一个异步WCF服务,它采用"URI"并返回一个图像(作为流).

我想要做的是:

  • 如果没有创建,则确保存在有效的WCF通道
  • 进行异步服务调用
  • 成功时将图像保存到成员变量
  • 如果我收到异常,请关闭频道
  • 无论是失败还是成功,等待200ms然后重新开始(永久循环或直到取消)

到目前为止,我已经想出了这个怪物:

    private void PollImage(string imageUri)
    {
        const int pollingHertz = 1;
        const int millisecondsTimeout = 1000 / pollingHertz;
        Thread.Sleep(millisecondsTimeout);

        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }

        var getImageFunc = Observable.FromAsyncPattern<string, Stream>
                                  (_channel.BeginGetImage, _channel.EndGetImage);

        getImageFunc(imageUri)
            .Finally(() => PollImage(imageUri))
            .Subscribe(
                stream => UpdateImageStream(imageUri, stream),
                ex =>
                    {
                        Trace.TraceError(ex.ToString());
                        ((ICommunicationObject) _channel).CloseOrAbort();
                        _channel = null;
                    });
    }
Run Code Online (Sandbox Code Playgroud)

我真的很想学习Rx但是每次尝试都会让我不知所措.

有人愿意给我一些指示吗?谢谢

Eni*_*ity 8

我有一个解决方案,但我建议改变你的PollImage方法,使其更像Rx.

签名应如下所示:

IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
Run Code Online (Sandbox Code Playgroud)

您应该考虑PollImage成为一个可观察的工厂,在您订阅返回的observable之前,它实际上不会轮询图像.这种方法的优点是它可以取消订阅 - 你的最后一个要点需要这个 - 并且它干净地分离了轮询图像的代码和更新局部变量的代码.

那么,对PollImage当时的调用看起来像这样:

PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
    .Subscribe(image =>
    {
        /* do save/update images here */
    });
Run Code Online (Sandbox Code Playgroud)

实现看起来像这样:

private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
    Func<Stream, Image> getImageFromStream = st =>
    {
        /* read image from stream here */
    };

    return Observable.Create<Image>(o =>
    {
        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }

        var getImageFunc =
            Observable
                .FromAsyncPattern<string, Stream>(
                    _channel.BeginGetImage,
                    _channel.EndGetImage);

        var query =
            from ts in Observable.Timer(gapInterval)
            from stream in getImageFunc(imageUri)
            from img in Observable.Using(
                () => stream,
                st => Observable.Start(
                    () => getImageFromStream(st)))
            select img;

        return query.Do(img => { }, ex =>
        {
            Trace.TraceError(ex.ToString());
            ((ICommunicationObject)_channel).CloseOrAbort();
            _channel = null;
        }).Repeat().Retry().Subscribe(o);                   
    });
}
Run Code Online (Sandbox Code Playgroud)

query观察的等待,直到gapInterval完成,然后调用WCF函数返回流,然后将数据流转换为图像.

内部return陈述做了很多事情.

首先,它使用Do运算符捕获发生的任何异常,并像以前一样进行跟踪和通道重置.

接下来,它调用.Repeat()以使查询有效地重新运行,使其gapInterval在再次调用webservice之前等待.我本来可以使用Observable.Interval而不是Observable.Timerin query和drop来调用.Repeat(),但是这意味着对webservice的调用每次启动gapInterval而不是在上次完成之后等待很长时间.

接下来,.Retry()如果遇到异常,则调用有效地重新启动observable,以便订阅者永远不会看到异常.该Do运营商抓住了错误,所以这是确定.

最后,它订阅观察者并返回IDisposable允许调用代码取消订阅.

除了实现这个getImageFromStream功能之外,就是这个.

现在谨慎一点.很多人误解了订阅观察者的工作方式,这可能导致很难发现错误.

以此为例:

var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));

var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });
Run Code Online (Sandbox Code Playgroud)

这两个s1s2订阅xs,但不是分享他们各自创建一个定时器一个计时器.您有两个Observable.Interval创建的内部工作实例,而不是一个.

现在这是observables的正确行为.如果一个失败,那么另一个不会因为他们不共享任何内部 - 他们彼此隔离.

但是,在您的代码中(以及我的代码),您可能存在潜在的线程问题,因为您共享_channel多个调用PollImage.如果一个呼叫失败,它将重置该信道,这可能导致并发呼叫因此失败.

我的建议是,为每个调用创建一个新通道,以防止出现并发问题.

我希望这有帮助.