为什么 C# Rx Subscribe() 函数不适用于“async”关键字?

Tro*_*yvs 1 c# asynchronous subscribe system.reactive async-await

我有这个代码片段:

static void Main(string[] args)
{
    Observable.Range(1, 5).Subscribe(async x => await DoTheThing(x));
    Console.WriteLine("done");
}

static async Task DoTheThing(int x)
{
    await Task.Delay(TimeSpan.FromSeconds(x));
    Console.WriteLine(x);
}
Run Code Online (Sandbox Code Playgroud)

我希望它会循环 5 次,每次循环后都会有一行打印为

1
2
3
4
5
Run Code Online (Sandbox Code Playgroud)

但令人惊讶的是,这将打印“完成”并立即终止。似乎 async+await 没有等待 Task.Delay 并退出。

语义似乎没有问题,那么我在 Subscribe 或 async 哪里出错了,如何修复它以满足我从 Rx 调用异步任务的请求?

谢谢。

Ast*_*sti 5

它不是阻塞,因为它很好 - 异步。您的代码生成五个任务,所有任务都并行运行,并且都在不同时间完成。

但它们不会阻塞该Main功能。如果您只添加 aConsole.ReadKey()作为最后一行,您将看到您的代码确实在后台运行。它打印。

static void Main(string[] args)
{
    Observable.Range(1, 5).Subscribe(async x => await DoTheThing(x));
    Console.WriteLine("done");
    Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)

但是假设您想等到所有这些都完成。然后怎样呢?

当然,有.Wait(),但那是阻塞。让我们将我们所有的任务都作为可观察对象来观察。

我们将在使用 C# 7async Main时使用它。

static async Task Main(string[] args)
{
    await Observable.Range(1, 5)
        .Select(x => DoTheThing(x).ToObservable())
        .Merge();                

    Console.WriteLine("done");
}
Run Code Online (Sandbox Code Playgroud)

这与您期望的完全一样。