有没有人知道这个手工 if/then/else运算符的适当替代品用于反应式扩展(.Net/C#)?
public static IObservable<TResult> If<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, bool> predicate,
Func<TSource, IObservable<TResult>> thenSource,
Func<TSource, IObservable<TResult>> elseSource)
{
return source
.SelectMany(
value => predicate(value)
? thenSource(value)
: elseSource(value));
}
Run Code Online (Sandbox Code Playgroud)
用法示例(假设numbers类型为IObservable<int>:
numbers.If(
predicate: i => i % 2 == 0,
thenSource: i => Observable
.Return(i)
.Do(_ => { /* some side effects */ })
.Delay(TimeSpan.FromSeconds(1)), // some other operations
elseSource: i => Observable
.Return(i)
.Do(_ => { /* some other side effects */ }));
Run Code Online (Sandbox Code Playgroud) 有人可以解释为什么AsObservable即使到达流末尾,以下方法也会创建无限循环吗?
public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}
private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);
if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf; …Run Code Online (Sandbox Code Playgroud)