Ant*_*ony 101 c# system.reactive reactivex
我目前正在研究.NET的Reactive Extensions框架,我正在研究我发现的各种介绍资源(主要是http://www.introtorx.com)
我们的应用程序涉及许多检测网络帧的硬件接口,这些接口将是我的IObservables,然后我有各种组件将使用这些帧或对数据执行某种方式的转换并生成新类型的帧.例如,还有其他组件需要显示每个第n帧.我确信Rx对我们的应用程序有用,但是我正在努力处理IObserver接口的实现细节.
我读过的大多数(如果不是全部)资源都说我不应该自己实现IObservable接口,而是使用提供的函数或类之一.从我的研究看来,创建一个Subject<IBaseFrame>将为我提供我需要的东西,我会让我的单线程从硬件接口读取数据,然后调用我的Subject<IBaseFrame>实例的OnNext函数.然后,不同的IObserver组件将从该Subject接收其通知.
我的困惑来自本教程附录中的建议,其中说:
避免使用主题类型.Rx实际上是一种函数式编程范例.使用主题意味着我们现在正在管理状态,这可能会发生变异.同时处理变异状态和异步编程很难做到.此外,许多运算符(扩展方法)都经过精心编写,以确保维护订阅和序列的正确和一致的生命周期; 当你介绍科目时,你可以打破这个.如果您明确使用主题,未来版本也可能会出现明显的性能下降.
我的应用程序对性能至关重要,我显然会在进入生产代码之前测试使用Rx模式的性能; 但是我担心我正在通过使用Subject类来做违背Rx框架精神的事情,并且该框架的未来版本将损害性能.
有没有更好的方式做我想要的?无论是否有任何观察者,硬件轮询线程都将连续运行(否则HW缓冲区将备份),因此这是一个非常热门的序列.然后我需要将收到的帧传递给多个观察者.
任何建议将不胜感激.
Lee*_*ell 64
好的,如果我们忽略了我的教条方式而忽略了"主题好/坏".让我们看一下问题空间.
我打赌你要么有2种风格的系统中的一种需要深入了解.
对于选项1,简单,我们只需使用适当的FromEvent方法将其包装起来,我们就完成了.去酒吧!
对于选项2,我们现在需要考虑如何轮询这个以及如何有效地执行此操作.当我们获得价值时,我们如何发布它?
我想你会想要一个专用的线程进行轮询.你不会想要一些其他编码器锤击ThreadPool/TaskPool并让你处于ThreadPool饥饿状态.或者你不想要上下文切换的麻烦(我猜).所以假设我们有自己的线程,我们可能会有一些我们坐在轮询中的While/Sleep循环.当检查发现一些消息时,我们发布它们.所有这些听起来都非常适合Observable.Create.现在我们可能无法使用While循环,因为它不会允许我们返回一个Disposable以允许取消.幸运的是,你已经阅读了整本书,所以精通递归调度!
我想这样的事情可行.#NotTested
public class MessageListener
{
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;
public MessageListener()
{
_scheduler = new EventLoopScheduler();
var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();
_messages = messages;
messages.Connect();
}
public IObservable<IMessage> Messages
{
get {return _messages;}
}
private IObservable<IMessage> ListenToMessages()
{
return Observable.Create<IMessage>(o=>
{
return _scheduler.Schedule(recurse=>
{
try
{
var messages = GetMessages();
foreach (var msg in messages)
{
o.OnNext(msg);
}
recurse();
}
catch (Exception ex)
{
o.OnError(ex);
}
});
});
}
private IEnumerable<IMessage> GetMessages()
{
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
}
}
Run Code Online (Sandbox Code Playgroud)
我真的不喜欢Subjects的原因通常是开发人员没有真正对问题进行明确设计的情况.破解一个主题,在那里和任何地方戳它,然后让穷人支持开发人员猜测WTF正在进行中.当您使用Create/Generate等方法时,您正在对序列进行本地化处理.你可以在一种方法中看到它,你知道没有其他人会产生令人不快的副作用.如果我看到一个主题字段,我现在必须去寻找正在使用的类中的所有地方.如果某些MFer公开了一个,那么所有的赌注都会关闭,谁知道这个序列是如何被使用的!Async/Concurrency/Rx很难.你不需要通过允许副作用和因果关系编程来使你的头脑更加努力.
Wil*_*lka 33
一般来说,你应该避免使用Subject,但是对于你在这里做的事情,我认为它们的效果非常好.当我在Rx教程中遇到"避免主题"消息时,我问了一个类似的问题.
引用Dave Sexton(Rxx)
"主题是Rx的有状态组件.当你需要创建一个类似事件的observable作为字段或局部变量时,它们非常有用."
我倾向于使用它们作为Rx的切入点.所以,如果我有一些代码需要说'发生了什么'(就像你有的话),我会使用a Subject和call OnNext.然后将其公开IObservable为其他人订阅(您可以AsObservable()在您的主题上使用以确保没有人可以投射到主题并搞砸了).
您也可以通过.NET事件和使用来实现这一点FromEventPattern,但是如果我只是将事件变成一个事件IObservable,我不会看到有一个事件而不是一个事件的好处Subject(这可能意味着我失踪了这里的东西)
不过,你应该避免非常强烈被预订至IObservable一个Subject,即没有传递Subject到IObservable.Subscribe方法.
Nia*_*ton 24
通常,当您管理主题时,实际上只是重新实现Rx中已有的功能,并且可能没有那么强大,简单和可扩展的方式.
当您尝试将某些异步数据流调整为Rx(或从当前不是异步的异步数据流创建异步数据流)时,最常见的情况通常是:
数据来源是一个事件:正如李所说,这是最简单的情况:使用FromEvent并前往酒吧.
数据源来自同步操作,您需要轮询更新(例如,Web服务或数据库调用):在这种情况下,您可以使用Lee建议的方法,或者对于简单的情况,您可以使用类似的东西Observable.Interval.Select(_ => <db fetch>).您可能希望使用DistinctUntilChanged()来防止在源数据中没有任何更改时发布更新.
数据源是一种调用你的回调的异步api:在这种情况下,使用Observable.Create来连接你的回调以在观察者上调用OnNext/OnError/OnComplete.
数据源是一个阻塞直到新数据可用的调用(例如一些同步套接字读取操作):在这种情况下,您可以使用Observable.Create来包装从套接字读取并发布到Observer.OnNext的命令性代码.何时读取数据.这可能类似于您对主题所做的事情.
使用Observable.Create与创建管理Subject的类相当于使用yield关键字vs创建实现IEnumerator的整个类.当然,您可以将IEnumerator编写为与yield代码一样干净和良好的公民,但哪一个更好地封装并且感觉更整洁的设计?Observable.Create与管理Subjects的情况也是如此.
Observable.Create为您提供了一个干净的设置和干净的拆卸.如何通过包装主题的类来实现这一目标?你需要某种Start方法......你怎么知道何时调用它?或者你是否总是开始它,即使没有人在听?当你完成后,如何让它停止从套接字读取/轮询数据库等?您必须拥有某种Stop方法,并且您必须不仅可以访问您订阅的IObservable,而且还可以访问首先创建Subject的类.
使用Observable.Create,它们都被包含在一个地方.Observable.Create的主体在有人订阅之前不会运行,因此如果没有人订阅,则永远不会使用您的资源.并且Observable.Create返回一个可以干净地关闭资源/回调等的Disposable - 当Observer取消订阅时调用它.您用于生成Observable的资源的生命周期与Observable本身的生命周期紧密相关.
引用的块文本几乎解释了为什么你不应该使用它Subject<T>,但是为了简单起见,你将结合观察者和可观察的函数,同时在它们之间注入某种状态(无论你是封装还是扩展).
这是你遇到麻烦的地方; 这些责任应该是分开的,彼此不同.
也就是说,在您的具体情况下,我建议您将您的疑虑分解为更小的部分.
首先,您的线程很热,并始终监视硬件以获取通知的信号.你会怎么做? 活动.那么让我们从那开始吧.
让我们定义EventArgs你的事件将会触发.
// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
public BaseFrameEventArgs(IBaseFrame baseFrame)
{
// Validate parameters.
if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");
// Set values.
BaseFrame = baseFrame;
}
// Poor man's immutability.
public IBaseFrame BaseFrame { get; private set; }
}
Run Code Online (Sandbox Code Playgroud)
现在,将触发事件的类.请注意,这可能是一个静态类(因为你总是有一个线程中运行监控硬件缓存),或调用的按需订用的东西说.你必须适当地修改它.
public class BaseFrameMonitor
{
// You want to make this access thread safe
public event EventHandler<BaseFrameEventArgs> HardwareEvent;
public BaseFrameMonitor()
{
// Create/subscribe to your thread that
// drains hardware signals.
}
}
Run Code Online (Sandbox Code Playgroud)
所以现在你有一个暴露事件的类.Observables适用于事件.因此,IObservable<T>如果您遵循标准事件模式,通过类上的静态FromEventPattern方法,可以将第一类支持转换为事件流(将事件流视为事件的多次触发)转换为实现.Observable
通过事件源和FromEventPattern方法,我们可以IObservable<EventPattern<BaseFrameEventArgs>>轻松地创建(EventPattern<TEventArgs>该类体现了您在.NET事件中看到的内容,特别是派生自的实例EventArgs和代表发送者的对象),如下所示:
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
Run Code Online (Sandbox Code Playgroud)
当然,你想要一个IObservable<IBaseFrame>,但这很容易,使用类上的Select扩展方法Observable来创建一个投影(就像在LINQ中一样,我们可以用一个易于使用的方法包装所有这些):
public IObservable<IBaseFrame> CreateHardwareObservable()
{
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
// Return the observable, but projected.
return observable.Select(i => i.EventArgs.BaseFrame);
}
Run Code Online (Sandbox Code Playgroud)