创建后推送新值

Vin*_*nzo 1 c# system.reactive

阅读 IntroToRx 网站,它不鼓励使用Subject,而是使用 Observable.Create 辅助方法。

正如我所看到的,OnNext 方法只能在 subscribe 方法中调用,因为它是我可以访问 Observer 对象的唯一部分。

如果我想在创建新值后推送新值怎么办?我是否“被迫”使用主题?

Jer*_*all 5

如果你只是在探索 Rx,那就去做吧 - 使用主题,疯狂,看看它们是如何工作的,自己发现它们的优点和缺点,然后回到这里并阅读讨论为什么主题不受欢迎的问题。

主题提供了一种更简单的方法来“快速引导”想法和复杂的 Rx 场景,而无需复制实际的源条件。

也就是说,它们确实状态注入到了某种无状态的操作链中,所以要小心不要依赖它们。

所以,总结一下:如果您尝试生成序列来测试/学习 rx 如何工作或如何进行查询 X,请使用这些主题。如果您发现自己在查询内使用它们,那么很可能有更好的方法。

编辑:意识到我错过了一些东西:

另外,您询问是否有另一种方法可以在创建后引发流事件……答案是肯定的;您可以通过 Create 或 Return 或Generate 声明一个流,该流返回您定义的任何旧的基于 IObservable 的对象,该对象还可以公开注入事件的方法......或者哎呀,有一个 lambda 旋转一个线程来检查共享列表被路由到返回流......我想我想说的是可能性是无限的。在 Observable 上声明了十几个“创建事件序列”方法 - 全部尝试!

编辑2:

一个例子?Observable.Create当然,让我们使用模仿一个非常低效的东西来组合一些东西Subject

var running = true;
var values = new ConcurrentQueue<int>();
var query = Observable.Create<int>(obs =>
{
    var body = Task.Factory.StartNew(()=>
    {
        while(running)
        {
            int nextValue;
            if(values.TryDequeue(out nextValue))
            {
                obs.OnNext(nextValue);
            }
            Thread.Yield();
        }
    });
    return Disposable.Create(() =>
    {
        try
        {
            running = false;
            body.Wait();
            obs.OnCompleted();            
        }
        catch(Exception ex)
        {
            obs.OnError(ex);
        }
    });
});
using(query.Subscribe(Console.WriteLine))
{
    values.Enqueue(1);
    values.Enqueue(2);
    values.Enqueue(3);
    values.Enqueue(4);
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)

请注意,这只是快速且极其肮脏的示例代码。:)