ReactiveExtensions Observable FromAsync调用两次Function

jjc*_*hiw 3 c# task system.reactive

好的,试着去理解Rx,有点迷失在这里.

FromAsyncPattern现在已被弃用,所以我从这里拿到了示例(使用Rx轻松完成任务),它可以工作,我只是做了一些更改,而不是等待等待观察和订阅.....

我不明白的是为什么称为SumSquareRoots函数的两倍?

 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();


class Program
{
    static void Main(string[] args)
    {
        Samples();
    }

    static void Samples()
    {
        var x = 100000000;

        try
        {
            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();
        }
        catch (TimeoutException)
        {
            Console.WriteLine("Timed out :-(");
        }
    }

    static Task<double> SumSquareRoots(long count, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i = 0; i < count; i++)
            {
                res += Math.Sqrt(i);

                if (i % 10000 == 0 && ct.IsCancellationRequested)
                {
                    Console.WriteLine("Noticed cancellation!");
                    ct.ThrowIfCancellationRequested();
                }
            }

            return res;
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

Ana*_*tts 12

这两次调用SumSquareRoots的原因是因为你订阅了两次:

// Subscribes to res
res.Subscribe(y => Console.WriteLine(y));

// Also Subscribes to res, since it *must* produce a result, even
// if that result is then discarded (i.e. Wait doesn't return IObservable)
res.Wait();
Run Code Online (Sandbox Code Playgroud)

SubscribeforeachRx的-就像如果你foreachIEnumerable两倍,你可能最终做2倍的工作,多Subscribes表示多的工作.要撤消此操作,您可以使用不会丢弃结果的阻止调用:

Console.WriteLine(res.First());
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用Publish"冻结"结果并将其播放回> 1个订阅者(类似于您ToArray在LINQ中的使用方式):

res = res.Publish();
res.Connect();

// Both subscriptions get the same result, SumSquareRoots is only called once
res.Subscribe(Console.WriteLine);
res.Wait();
Run Code Online (Sandbox Code Playgroud)

您可以遵循的一般规则是,任何返回IObservable<T>Task<T>将导致订阅的Rx方法(*)

*- 技术上不正确.但是如果你这样想的话,你的大脑会感觉更好.