Rx很棒,但有时很难找到优雅的做事方式.这个想法很简单.我收到一个带有byte []的事件,这个数组可能包含一行,多行或一行.我想要的是找到一种方法来生成一个IObservable of Line IObservable<String>,其中序列的每个元素都是一条线.
几小时后,我发现最接近的解决方案非常难看,当然不能正常工作,因为扫描触发每个字符上的OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
Run Code Online (Sandbox Code Playgroud)
注意:_reactiveSubcription应该是IObservable<String> …