如何根据第二个流拆分块中的Observable流?

hko*_*hko 9 c# system.reactive

我觉得这很容易,但我的脑子现在正在融化......

问题

鉴于以下IObservable<int>流: 1 1 0 0 0 1 0 0 1 0 1

我想将它拆分为IObservable<IEnumerable<int>>表单的Stream

1

1 0 0 0

1 0 0

1 0

1

因此,只要有0,它就会被添加到IEnumerable中,当1出现时,会启动一个新的List; 对于我的真正问题,这是一个更清晰的定义.

到目前为止我的方法

我认为一个好的解决方案是首先将它转换为一个IObservable<IObservable<int>>通过Window方法,然后使用ToEnumerable,但不知何故我不能让它工作..我用ZipSkip(1)得到一个差异到最后一个元素,我也用过DistinctUntilChanged().我饶了你试过的所有variantes ......

可能我最接近的是这段代码:

int[] ints = new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 };
var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Take(11).Select(i => ints[i]);

Subject<int> subject = new Subject<int>();
observable.Subscribe(subject);

var observableDiff = subject.Skip(1).Zip(subject, (n, p) => new { Previous = p, Next = n });
var windows = observable.Window(() => observableDiff.Where(x => x.Next == 1));

int index = 0;
windows.Subscribe(window =>
{
  Console.WriteLine(string.Format("new window [{0}] ", index++));
  window.Subscribe(number => Console.WriteLine(number));
});
Run Code Online (Sandbox Code Playgroud)

这会带来好的结果,但遗憾的是它最终会崩溃.

new window [0]
1
new window [1]
1
0
0
0
new window [2]
1
0
0
new window [3]
1
0
new window [4]
new window [5]
new window [6]
new window [7]
new window [8]
new window [9]
<-- it goes on here until window ~ [80] with a stackoverflow exception
Run Code Online (Sandbox Code Playgroud)

如果我的代码中的错误不存在,我会实现它...

任何帮助将非常感激.:)

编辑:我使用Rx-Experimental,但它没有区别(用LinqPad检查).同时删除了主题,它没有影响任何东西.看来我的新方法(Edit2),你需要一个主题,否则窗口的开始是完全奇怪的.

Edit2:稍微改变了问题,以便更好地突出我的问题,抱歉.还更新了我的解决方案

Eni*_*ity 12

这对我有用:

var ints = (new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 }).ToObservable();

var result =
    ints
        .Publish(ns =>
            ns
                .Where(n => n == 1)
                .Select(n =>
                    ns.TakeWhile(m => m == 0).StartWith(n).ToArray())
        ).Merge();
Run Code Online (Sandbox Code Playgroud)

我已经习惯于Publish确保将ints可观察对象视为"热"而不是"冷".

我的结果如下:

分组整数