如何将实体推送到Rx Observable?

GWL*_*osa 6 .net c# system.reactive

我有一个类负责在频繁但不规则的时间间隔内生成事件,其他类必须消耗和操作.我想将Reactive Extensions用于此任务.

消费者方面非常直截了当; 我有我的消费类实现IObserver<Payload>,一切似乎都很好.问题出现在生产者类上.

IObservable<Payload>直接实现(也就是说IDisposable Subscribe(IObserver<Payload> ),根据文档,我不建议使用我自己的实现.它建议改为使用Observable.Create()函数集合.由于我的类将运行很长时间,我已经尝试创建一个Observable var myObservable = Observable.Never(),然后,当我有新的Payloads可用时,调用myObservable.Publish(payloadData).但是,当我这样做时,我似乎没有OnNext在我的消费者中实现.

我认为,作为一种解决方法,我可以在我的类中创建一个事件,然后使用该FromEvent函数创建Observable ,但这似乎是一种过于复杂的方法(即,Observables的新热点需要'看起来很奇怪工作的事件).有一个简单的方法我在这里俯瞰吗?创建自己的Observable源的"标准"方法是什么?

Jam*_*rld 7

创建一个new Subject<Payload>并调用它的OnNext方法来发送一个事件.您可以Subscribe与观察者一起探讨这个问题.

主题的使用经常被争论.有关使用主题(链接到引物)的详细讨论,请参阅此处 - 但总的来说,此用例听起来有效(即它可能符合本地+热门标准).

顺便说一下,订阅接受代表的重载可能会消除您提供实施的需要IObserver<T>.