使用Rx简化异步Silverlight Web服务请求

Fla*_*DOA 4 silverlight wcf asynchronous system.reactive

我使用Rx为我的WCF Web服务编写了一个简化的Silverlight客户端库,但是有时我注意到我缺少已完成的事件.

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
           from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName)
           from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1)
           from close in this.CloseClient(client)
           select result.EventArgs.Result;
}
Run Code Online (Sandbox Code Playgroud)

我认为问题是由于Web服务被调用并在订阅已完成的事件之前返回.我无法弄清楚如何让Rx在异步调用之前订阅该事件.我试过StartWith,但这需要输入和输出类型相同,任何想法?

Fla*_*DOA 7

似乎最好的答案是使用Observable.CreateWithDisposable()

例如

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer =>
                {
                    var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted")
                        .Take(1)
                        .Select(e => e.EventArgs)
                        .Subscribe(observer);
                    client.GetReportDataAsync(reportName);
                    return subscription;
                })
            from close in this.CloseClient(client)
            select completed.Result;
}
Run Code Online (Sandbox Code Playgroud)

为了使这更容易使用,我将CreateWithDisposable重构为一个可以与我的所有Web服务调用一起使用的公共函数,包括从事件args类型自动确定事件名称:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs
{
    if (typeof(T) == typeof(AsyncCompletedEventArgs))
    {
        throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead.");
    }

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs");
    return CallService<T>(serviceClient, start, completedEventName);
}

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer);
        start();
        return subscription;
    });
}

// Example usage:
public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in this.CallService<GetReportDataCompletedEventArgs>(client, () => client.GetReportDataAsync(reportName))
            from close in this.CloseClient(client)
            select completed.Result;
}

/// <summary>
/// Asynchronously closes the web service client
/// </summary>
/// <param name="client">The web service client to be closed.</param>
/// <returns>Returns a cold observable sequence of a single success Unit.</returns>
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client)
{
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted");
}
Run Code Online (Sandbox Code Playgroud)

希望这有助于其他人!