Rx中基于数据的缓冲

Soe*_*Moe 1 system.reactive

让我先解释一下我想要实现的目标.

假设我从事件流中传入了以下数据

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };
Run Code Online (Sandbox Code Playgroud)

当我订阅数据源时,我希望得到以下结果

"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"
Run Code Online (Sandbox Code Playgroud)

基本上,我希望得到以ok或错误开头的数据以及开始和结束之间的数据.

到目前为止我试过这个..

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };



            var dataStream = Observable.Generate(
                                data.GetEnumerator(), 
                                e => e.MoveNext(), 
                                e => e, 
                                e => e.Current.ToString(), 
                                e => TimeSpan.FromSeconds(0.1));         

            var onelineStream = from d in dataStream
                                where d.StartsWith("ok") || d.StartsWith("error")
                                select d;

            // ???
            // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events"
            // but it is not working...
            var multiLineStream = from list in dataStream.Buffer<string, string, string>(
                                bufferOpenings: dataStream.Where(d => d.StartsWith("begin")),
                                bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end")))
                              select String.Join(" ", list);

            // merge two stream????
            // but I have no clue how to merge these twos :(

            mergeStream .Subscribe(d =>
            {
                Console.WriteLine(d);
                Console.WriteLine();
            });
Run Code Online (Sandbox Code Playgroud)

由于我对Reactive编程很陌生,所以我无法以反应的方式思考.:(

提前致谢.

Eni*_*ity 6

你是如此,非常接近正确的答案!

基本上你有onelineStream&multiLineStream查询恰到好处.

将它们合并在一起非常容易.这样做:

onelineStream.Merge(multiLineStream)
Run Code Online (Sandbox Code Playgroud)

但是,您的查询不足之处Observable.Generate在于您曾经介绍过值之间的延迟.这会产生一个可观察的观察结果,如果你有多个订阅者,那就是"粉丝"这些值.

根据您的数据和您的定义来dataStream查看此代码的行为:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine);
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine);
Run Code Online (Sandbox Code Playgroud)

你得到这些价值观:

!hello
@Using
!ok:michael
@ok
@1:232
!begin:events
@2:343
!end:events
!fdl
@error:dfljsdf
!error:fjkdjslf
@ok
Run Code Online (Sandbox Code Playgroud)

请注意,有些是由一个订阅处理的,而其他的则由另一个订阅处理.这意味着即使您的onelineStream&multiLineStream查询恰到好处,它们也只会看到一些数据,因此不会像您期望的那样运行.

您还可以获得可以跳过和复制值的竞争条件.因此,最好避免这种可观察性.

在值之间引入延迟的更好方法是:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100));
Run Code Online (Sandbox Code Playgroud)

现在,这会创建一个"冷"可观察对象,这意味着每个新订阅者将从第一个值开始获得对可观察对象的全新订阅.

您的multiLineStream查询将无法在冷可观察量上正常工作.

为了使数据流成为"热"可观察对象(在订户之间共享值),我们使用Publish运算符.

所以,multiLineStream现在看起来像这样:

var multiLineStream =
    dataStream.Publish(ds =>
        from list in ds.Buffer(
            ds.Where(d => d.StartsWith("begin")),
            b => ds.Where(d => d.StartsWith("end")))
        select String.Join(" ", list));
Run Code Online (Sandbox Code Playgroud)

然后你可以得到你的结果:

onelineStream.Merge(multiLineStream).Subscribe(d =>
{
    Console.WriteLine(d);
    Console.WriteLine();
});
Run Code Online (Sandbox Code Playgroud)

这就是我得到的:

ok:michael
ok
begin:events 1:232 2:343 end:events
error:dfljsdf
error:fjkdjslf
ok
Run Code Online (Sandbox Code Playgroud)

如果这对您有用,请告诉我.