小编Dan*_*ler的帖子

c#中的observable有if/then/else运算符吗?

有没有人知道这个手工 if/then/else运算符的适当替代品用于反应式扩展(.Net/C#)?

public static IObservable<TResult> If<TSource, TResult>(
  this IObservable<TSource> source,
  Func<TSource, bool> predicate,
  Func<TSource, IObservable<TResult>> thenSource,
  Func<TSource, IObservable<TResult>> elseSource) 
{
  return source
    .SelectMany(
      value => predicate(value)
        ? thenSource(value)
        : elseSource(value));
}
Run Code Online (Sandbox Code Playgroud)

用法示例(假设numbers类型为IObservable<int>:

numbers.If(
  predicate: i => i % 2 == 0,
  thenSource: i => Observable
    .Return(i)
    .Do(_ => { /* some side effects */ })
    .Delay(TimeSpan.FromSeconds(1)), // some other operations
  elseSource: i => Observable
    .Return(i)
    .Do(_ => { /* some other side effects */ }));
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive

6
推荐指数
1
解决办法
595
查看次数

Observable.FromAsync+Repeat+TakeWhile 组合创建无限循环

有人可以解释为什么AsObservable即使到达流末尾,以下方法也会创建无限循环吗?

public static class StreamExt {
    public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
        return Observable
            .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
            .Repeat()
            .TakeWhile(bytes => bytes != null) // EndOfStream
            .SelectMany(bytes => bytes);
    }

    private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
        var buf = new byte[bufferSize];
        var bytesRead = await stream
            .ReadAsync(buf, 0, bufferSize, cancel)
            .ConfigureAwait(false);

        if (bytesRead < 1) return null; // EndOfStream
        var result_size = Math.Min(bytesRead, bufferSize);
        Array.Resize(ref buf, result_size);
        return buf; …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive reactive

5
推荐指数
1
解决办法
1060
查看次数

标签 统计

c# ×2

system.reactive ×2

.net ×1

reactive ×1