Jon*_*esø 12 .net system.reactive
我试图了解Reactive Extensions(Rx)的正确用例.不断出现的示例是UI事件(拖放,绘图),以及Rx适用于异步应用程序/操作(如Web服务调用)的建议.
我正在开发一个应用程序,我需要为REST服务编写一个小客户端API.我需要调用四个REST端点,三个用于获取一些参考数据(机场,航空公司和状态),第四个是主要服务,它将为您提供给定机场的飞行时间.
我创建了暴露三个参考数据服务的类,方法如下所示:
public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)
Run Code Online (Sandbox Code Playgroud)
在我的GetFlights方法中,我希望每个航班都能在其离开的机场和航空公司的航班上保留参考.为此,我需要GetAirports和GetAirlines的数据可用.每个机场,航空公司和状态将被添加到Dictionar(ie.e Dictionary),以便我可以在解析每个航班时轻松设置参考.
flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]
Run Code Online (Sandbox Code Playgroud)
我现在的实现现在看起来像这样:
public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
var airports = new AirportNamesService().GetAirports();
var airlines = new AirlineNamesService().GetAirlines();
var statuses = new StatusService().GetStautses();
var referenceData = airports
.ForkJoin(airlines, (allAirports, allAirlines) =>
{
Airports.AddRange(allAirports);
Airlines.AddRange(allAirlines);
return new Unit();
})
.ForkJoin(statuses, (nothing, allStatuses) =>
{
Statuses.AddRange(allStatuses);
return new Unit();
});
string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);
var flights = from data in referenceData
from flight in GetFlightsFrom(url)
select flight;
return flights;
}
private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}
Run Code Online (Sandbox Code Playgroud)
当前的实现是基于谢尔盖的回答,并使用ForkJoin保证顺序执行,而且我引用数据获取航班之前加载.与我之前的实现一样,必须触发"ReferenceDataLoaded"事件,这个实现更加优雅.
我认为,如果您从每个 REST 调用接收实体列表,您的调用应该有一点不同的签名 - 您不是在观察返回集合中的每个值,而是在观察调用完成的事件。所以对于机场来说,它应该有签名:
public IObservable<Aiports> GetAirports()
Run Code Online (Sandbox Code Playgroud)
下一步是并行运行前三个并等待所有它们:
var ports_lines_statuses =
Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());
Run Code Online (Sandbox Code Playgroud)
第三步是将上述可观察值与 GetFlights() 组合起来:
var decoratedFlights =
from pls in ports_lines_statuses
let airport = MyAirportFunc(pls)
from flight in GetFlights(airport)
select flight;
Run Code Online (Sandbox Code Playgroud)
编辑:我仍然不明白为什么你的服务返回
IObservable<Airport>
Run Code Online (Sandbox Code Playgroud)
代替
IObservable<IEnumerable<Airport>>
Run Code Online (Sandbox Code Playgroud)
AFAIK,从 REST 调用中,您可以立即获取所有实体 - 但也许您会进行分页?无论如何,如果你想让 RX 进行缓冲,你可以使用 .BufferWithCount() :
var allAirports = new AirportNamesService()
.GetAirports().BufferWithCount(int.MaxValue);
...
Run Code Online (Sandbox Code Playgroud)
然后你可以应用ForkJoin:
var ports_lines_statuses =
allAirports
.ForkJoin(allAirlines, PortsLinesSelector)
.ForkJoin(statuses, ...
Run Code Online (Sandbox Code Playgroud)
ports_lines_statuses 将包含时间线上的单个事件,其中包含所有参考数据。
编辑:这是另一个,使用新创建的 ListObservable (仅限最新版本):
allAiports = airports.Start();
allAirlines = airlines.Start();
allStatuses = statuses.Start();
...
whenReferenceDataLoaded =
Observable.Join(airports.WhenCompleted()
.And(airlines.WhenCompleted())
.And(statuses.WhenCompleted())
Then((p, l, s) => new Unit()));
public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
{
return source
.Materialize()
.Where(n => n.Kind == NotificationKind.OnCompleted)
.Select(_ => new Unit());
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5807 次 |
| 最近记录: |