使用Reactive Extensions(Rx)创建REST客户端API

Jon*_*esø 12 .net system.reactive

我试图了解Reactive Extensions(Rx)的正确用例.不断出现的示例是UI事件(拖放,绘图),以及Rx适用于异步应用程序/操作(如Web服务调用)的建议.

我正在开发一个应用程序,我需要为REST服务编写一个小客户端API.我需要调用四个REST端点,三个用于获取一些参考数据(机场,航空公司和状态),第四个是主要服务,它将为您提供给定机场的飞行时间.

我创建了暴露三个参考数据服务的类,方法如下所示:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)
Run Code Online (Sandbox Code Playgroud)

在我的GetFlights方法中,我希望每个航班都能在其离开的机场和航空公司的航班上保留参考.为此,我需要GetAirports和GetAirlines的数据可用.每个机场,航空公司和状态将被添加到Dictionar(ie.e Dictionary),以便我可以在解析每个航班时轻松设置参考.

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]
Run Code Online (Sandbox Code Playgroud)

我现在的实现现在看起来像这样:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}
Run Code Online (Sandbox Code Playgroud)

当前的实现是基于谢尔盖的回答,并使用ForkJoin保证顺序执行,而且我引用数据获取航班之前加载.与我之前的实现一样,必须触发"ReferenceDataLoaded"事件,这个实现更加优雅.

Ser*_*hov 2

我认为,如果您从每个 REST 调用接收实体列表,您的调用应该有一点不同的签名 - 您不是在观察返回集合中的每个值,而是在观察调用完成的事件。所以对于机场来说,它应该有签名:

public IObservable<Aiports> GetAirports()
Run Code Online (Sandbox Code Playgroud)

下一步是并行运行前三个并等待所有它们:

var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());
Run Code Online (Sandbox Code Playgroud)

第三步是将上述可观察值与 GetFlights() 组合起来:

var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;
Run Code Online (Sandbox Code Playgroud)

编辑:我仍然不明白为什么你的服务返回

IObservable<Airport> 
Run Code Online (Sandbox Code Playgroud)

代替

IObservable<IEnumerable<Airport>>
Run Code Online (Sandbox Code Playgroud)

AFAIK,从 REST 调用中,您可以立即获取所有实体 - 但也许您会进行分页?无论如何,如果你想让 RX 进行缓冲,你可以使用 .BufferWithCount() :

    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...
Run Code Online (Sandbox Code Playgroud)

然后你可以应用ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...
Run Code Online (Sandbox Code Playgroud)

ports_lines_statuses 将包含时间线上的单个事件,其中包含所有参考数据。

编辑:这是另一个,使用新创建的 ListObservable (仅限最新版本):

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }
Run Code Online (Sandbox Code Playgroud)