小编Lau*_*ent的帖子

将IObservable <byte []>拆分为字符然后再行

Rx很棒,但有时很难找到优雅的做事方式.这个想法很简单.我收到一个带有byte []的事件,这个数组可能包含一行,多行或一行.我想要的是找到一种方法来生成一个IObservable of Line IObservable<String>,其中序列的每个元素都是一条线.

几小时后,我发现最接近的解决方案非常难看,当然不能正常工作,因为扫描触发每个字符上的OnNext:

//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));
Run Code Online (Sandbox Code Playgroud)

注意:_reactiveSubcription应该是IObservable<String> …

c# system.reactive

3
推荐指数
1
解决办法
451
查看次数

标签 统计

c# ×1

system.reactive ×1