Wil*_*ans 5 c# repository observablecollection reactive-programming system.reactive
我正在常见的场景中工作,我希望访问存储库的子集,而不用担心必须保持更新,例如“获取价格大于 10 的所有订单”。我已经实现了一个解决方案,但有两个问题(列在最后)。
存储库的子集可以通过相当于的东西来实现
var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);
但这是一个IEnumerable,不会在原始集合更新时更新。我可以添加 的处理程序CollectionChanged,但如果我们想访问更多子集怎么办?
var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);
Run Code Online (Sandbox Code Playgroud)
我们还必须连接一个集合 - 也为这个集合进行了更改。实时更新的概念让我想到了 Rx,因此我开始构建一个ObservableCache,其中包含ObservableCollection自动更新的项目和用于通知的 RX 流。(该流也是在底层更新缓存的内容。)
class ObservableCache<T> : IObservableCache<T>
{
private readonly ObservableCollection<T> _cache;
private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;
public ObservableCache(IEnumerable<T> initialCache
, IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
{
_cache = new ObservableCollection<T>(initialCache.Where(filter));
_updates = currentStream.Where(tuple => filter(tuple.Item1));
_updates.Subscribe(ProcessUpdate);
}
private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
{
var item = update.Item1;
lock (_cache)
{
switch (update.Item2)
{
case CRUDOperationType.Create:
_cache.Add(item);
break;
case CRUDOperationType.Delete:
_cache.Remove(item);
break;
case CRUDOperationType.Replace:
case CRUDOperationType.Update:
_cache.Remove(item); // ToDo: implement some key-based equality
_cache.Add(item);
break;
}
}
}
public ObservableCollection<T> Cache
{
get { return _cache; }
}
public IObservable<T> Updates
{
get { return _updates.Select(tuple => tuple.Item1); }
}
public IObservableCache<T> Where(Func<T, bool> predicate)
{
return new ObservableCache<T>(_cache, _updates, predicate);
}
}
Run Code Online (Sandbox Code Playgroud)
然后你可以像这样使用它:
var expensiveOrders = new ObservableCache<Order>(_orders
, updateStream
, o => o.Price > 10);
expensiveOrders.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;
var expensiveOrdersFromBob = expensiveOrders
.Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;
Run Code Online (Sandbox Code Playgroud)
等等,这个想法是,您可以继续将缓存投影到越来越窄的子集中,而不必担心它不同步。那么我的问题是什么?
ISequenceableItem接口。有没有更好的方法来做到这一点?RX 很棒,因为它可以为您处理所有线程。我想利用这一点。 您不需要锁定_cache在ProcessUpdate. 如果您的源可观察对象currentStream遵循 Rx 准则,那么您保证一次只能进行一次调用OnNext。换句话说,当您仍在处理前一个值时,您将不会从流中接收另一个值。
解决竞争条件的唯一可靠方法是确保在updateStream开始生成数据之前创建缓存。
您可能想查看Reactive Extensions (Rxx) 的扩展。我相信 Dave 已经构建了许多用于将 UI 控件绑定到可观察数据的实用程序。文档很少。我不知道你正在做的事情是否有任何意义。
| 归档时间: |
|
| 查看次数: |
2423 次 |
| 最近记录: |