相关疑难解决方法(0)

一次执行一个异步可观察

我正在尝试将一些TPL异步集成到更大的Rx链中Observable.FromAsync,就像在这个小例子中一样:

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace rxtest
{
    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .SelectMany((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Take(10)
                .LastOrDefaultAsync();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,我注意到这似乎同时启动所有异步任务,而不是一次启动它们,这会导致应用程序的内存使用量增加.在SelectMany似乎作用并不比不同Merge.

在这里,我看到这样的输出:

start:  0
start:  1
start:  2
...
Run Code Online (Sandbox Code Playgroud)

我想看: …

.net c# task-parallel-library system.reactive

4
推荐指数
1
解决办法
790
查看次数

F#Reactive等待Observable完成

我发现了各种各样的SO问题,但无法找出F#解决方案.我需要阻止等待事件向我发射以检查它返回的数据.我使用Rx接收事件3次:

let disposable =
    Observable.take 3 ackNack
    |> Observable.subscribe (
        fun (sender, data) ->
            Console.WriteLine("{0}", data.AckNack)
            Assert.True(data.TotalAckCount > 0u)
    )
Run Code Online (Sandbox Code Playgroud)

我想将结果转换为列表,以便稍后可以通过测试框架(xUnit)进行检查,或等待所有3个事件完成并通过Assert.True.

在继续之前,我将如何等待3场比赛?我可以看到Observable.wait其他消息来源的建议Async.RunSynchronously.

f# xunit system.reactive

3
推荐指数
1
解决办法
311
查看次数

标签 统计

system.reactive ×2

.net ×1

c# ×1

f# ×1

task-parallel-library ×1

xunit ×1