可观察到循环推送值列表

Sup*_*JMN 2 .net c# system.reactive

我想创建一个Observable,每秒都会持续推送一个值列表t.

例如,鉴于{1,2,3,4}订阅者应该得到这个:

1,2,3,4,1,2,3,4,1,2,3,4,1,2 ......

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            .Zip(Observable.Range(1, 4)
            .Repeat(), (_, count) => count);

        observable.Subscribe(Console.WriteLine);

        Console.WriteLine("Finished!");
    }
}
Run Code Online (Sandbox Code Playgroud)
  • 我已经研究过这个例子,它似乎有效,但是有一个非常讨厌的问题:Main方法永远不会结束它的执行!为什么?:(

  • 更糟糕的是,几分钟后,这个控制台应用程序抛出一个OutOfMemoryException!

Lee*_*ell 6

.Repeat()对我来说似乎是一个简单的错误.

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            .Zip(Observable.Range(1, 4), (_, count) => count)
            .Repeat();

        observable.Subscribe(Console.WriteLine);

        Console.WriteLine("Finished!");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

这将是:

  • 不阻止控制台完成
  • 不抛出OutOfMemoryException.

注意,没有使用.Do(),没有自定义扩展方法,没有无限制的IEnumerables阻塞线程;-)

...并且没有Zip这样的替代实施,希望外行开发者能够阅读和理解(也可以处置!):

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            //.Zip(Observable.Range(1, 4), (_, count) => count)
            .Select(i=>i+1)
            .Take(4)
            .Repeat();

        using (observable.Subscribe(Console.WriteLine))
        {
            Console.WriteLine("Running...");
            Console.ReadLine();
        }
        Console.WriteLine("Finished!");
    }
}
Run Code Online (Sandbox Code Playgroud)


Cha*_*ger 5

我删除了我的建议答案,因为Timothy和Lee的答案都使用了内置的Rx功能,而且更加优雅.不过,我会留下对这个问题的解释,因为我认为它很有用:

期望Observable是推送序列,并且Zip在等待来自第二个的值与下一个值配对时将从更快的生成流中排队项目.由于Obsevable.Range返回这些值与订阅者可以处理的速度一样快,这将填满所有内存并阻塞线程.