Ale*_*tof 20 .net c# system.reactive
我的同事和我有争议.我们正在编写一个处理海量数据的.NET应用程序.它接收数据元素,根据某些标准将它们的子集分成块,并处理这些块.
假设我们有类型的数据项Foo逐个到达某些来源(例如,来自网络).我们希望收集子集类型的相关对象Foo,构造类型的对象Bar从类型的每个这样的子集和处理的对象Bar.
我们其中一个人提出了以下设计.它的主题是IObservable<T>直接从我们组件的接口公开对象.
// ********* Interfaces **********
interface IFooSource
{
// this is the event-stream of objects of type Foo
IObservable<Foo> FooArrivals { get; }
}
interface IBarSource
{
// this is the event-stream of objects of type Bar
IObservable<Bar> BarArrivals { get; }
}
/ ********* Implementations *********
class FooSource : IFooSource
{
// Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}
class FooSubsetsToBarConverter : IBarSource
{
IFooSource fooSource;
IObservable<Bar> BarArrivals
{
get
{
// Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
}
}
}
// this class will subscribe to the bar source and do processing
class BarsProcessor
{
BarsProcessor(IBarSource barSource);
void Subscribe();
}
// ******************* Main ************************
class Program
{
public static void Main(string[] args)
{
var fooSource = FooSourceFactory.Create();
var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor
barsProcessor.Subscribe();
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Run Code Online (Sandbox Code Playgroud)
另一个建议另一个设计,它的主题是使用我们自己的发布者/订阅者接口,并仅在需要时在实现中使用Rx.
//********** interfaces *********
interface IPublisher<T>
{
void Subscribe(ISubscriber<T> subscriber);
}
interface ISubscriber<T>
{
Action<T> Callback { get; }
}
//********** implementations *********
class FooSource : IPublisher<Foo>
{
public void Subscribe(ISubscriber<Foo> subscriber) { /* ... */ }
// here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}
class FooSubsetsToBarConverter : ISubscriber<Foo>, IPublisher<Bar>
{
void Callback(Foo foo)
{
// here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
// maybe we use Rx here internally.
}
public void Subscribe(ISubscriber<Bar> subscriber) { /* ... */ }
}
class BarsProcessor : ISubscriber<Bar>
{
void Callback(Bar bar)
{
// here we put code that processes Bar objects
}
}
//********** program *********
class Program
{
public static void Main(string[] args)
{
var fooSource = fooSourceFactory.Create();
var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Run Code Online (Sandbox Code Playgroud)
你觉得哪一个更好?公开IObservable<T>和制作我们的组件可以从Rx运营商创建新的事件流,或者定义我们自己的发布者/订阅者接口并在需要时在内部使用Rx?
以下是有关设计的一些事项:
在第一个设计中,我们接口的消费者在他/她的指尖拥有Rx的全部功能,并且可以执行任何Rx操作员.我们中的一个人声称这是一个优势而另一个人声称这是一个缺点.
第二种设计允许我们使用任何发布者/订阅者体系结构.第一个设计将我们与Rx联系在一起.
如果我们希望使用Rx的强大功能,则需要在第二个设计中进行更多工作,因为我们需要将自定义发布者/订阅者实现转换为Rx并返回.它需要为希望进行某些事件处理的每个类编写粘合代码.
Chr*_*oph 17
曝光IObservable<T>不会以任何方式使用Rx 污染设计.实际上,设计决策与暴露旧学校.NET事件或滚动自己的pub/sub机制之间的待决决定完全相同.唯一的区别是,这IObservable<T>是更新的概念.
需要证明吗?看看F#,它也是一种.NET语言,但比C#更年轻.在F#中,每个事件都来源于IObservable<T>.老实说,我认为抽象出一个非常合适的.NET发布/订阅机制是没有意义的 - 就是IObservable<T>- 用你自己开发的pub/sub抽象.暴露IObservable<T>.
滚动自己的发布/订阅抽象感觉就像将Java模式应用于.NET代码一样.不同的是,在.NET中,对Observer模式一直有很好的框架支持,根本不需要自己动手.
首先,这是值得注意的IObservable<T>是部分mscorlib.dll和System命名空间,从而暴露那就有点相当于暴露IComparable<T>或IDisposable.这相当于选择.NET作为您的平台,您似乎已经完成了.
现在,我不是建议一个答案,而是提出一个不同的问题,然后是一个不同的心态,我希望(并且相信)你将从那里进行管理.
您基本上要问:我们是否希望在整个系统中促进Rx运营商的分散使用?.现在显然这不是很吸引人,因为你可能在概念上将Rx视为第三方库.
无论哪种方式,答案都不在于您提出的基础设计,而在于这些设计的用户.我建议将您的设计分解为抽象级别,并确保Rx运算符的使用仅限于一个级别.当我谈论抽象级别时,我的意思是类似于OSI模型,只在同一个应用程序的代码中.
在我的书中,最重要的是不要采用"让我们创造一些将在整个系统中使用和分散的东西"的设计立场,因此我们需要确保我们只做一次而且恰到好处,因为所有这些年来".我更像是一个"让我们让这个抽象层产生其他层目前实现其目标所需的最小API ".
关于你的两个设计的简单性,它实际上很难判断,因为Foo并Bar没有告诉我很多用例,因此可读性因素(顺便说一下,从一个用例到另一个用例).
| 归档时间: |
|
| 查看次数: |
1094 次 |
| 最近记录: |