异步流与反应式扩展相比如何?

ca9*_*3d9 10 c# system.reactive c#-8.0 iasyncenumerable

如何比较以下两个?Rx 更强大吗?

反应式扩展:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    }
});

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());
Run Code Online (Sandbox Code Playgroud)

异步流:

public async void Run(string path)
{
    await foreach (var line in TestAsync())
    {
        Console.WriteLine(line);
    }
}

private async IAsyncEnumerable<string> TestAsync()
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    }
}
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 11

这两个功能协同工作。PS:算了async streams,想想await foreach

异步流

异步流是一个(相对)低级特性,允许异步迭代。就其本身而言,它不提供任何其他功能,如过滤、聚合等。它是基于拉的,而 Rx 是基于推送的。

您可以通过System.Linq.Async库在异步流上使用 LINQ 运算符,该库位于 ..... ReacticeX.NET Github 存储库中。它很快,但不提供 Rx 的事件处理功能。

例如,没有时间的概念,更不用说使用自定义调度程序的方法了。没有订阅,没有错误事件。GroupBy 将消耗整个源并将组项目作为单独的IAsyncEnumerable实例发出,而 Rx 的 GroupBy 将为每个组发出单独的 Observable。

在问题的示例中, IAsyncEnumerable 是一个自然的选择,因为不涉及事件逻辑,只是在异步迭代器上进行迭代。

如果示例尝试轮询例如远程服务并检测故障峰值(即每个间隔的故障数超过阈值) IAsyncEnumerable 将是不合适的,因为它会阻止等待所有响应。事实上,我们根本无法聚合每次的事件。

穿线

真的没有 - IAsyncEnumerable 或await foreach调用不指定事件是如何产生或消费的。如果我们想使用单独的任务来处理一个项目,我们必须自己创建它,例如:

public async Task Run(string path)
{
    await foreach (var line in LoadStockTrades())
    {
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"{result} : {line});
    }
}
Run Code Online (Sandbox Code Playgroud)

反应式扩展

Reactive Extensions 是一个处理事件流的高级库。它是基于推送的,它理解时间,但它也比异步流或通道等低级结构慢。

在问题的示例中,Rx 会大材小用。轮询和检测尖峰很容易,有多个窗口选项。

System.Linq.Async 可以从 IAsyncEnumerable 和ToObservable创建一个 Observable ,这意味着 IAsyncEnumerable 可以用作 Rx 的源。

穿线

默认情况下,Rx 是单线程的,这对于它的主要场景 - 事件流处理非常有意义。

另一方面,Rx 允许发布者、订阅者和操作者在相同或不同的线程上运行。在这种语言具备async/await或数据流(例如,Java,JavaScript的),Rx的使用运行在不同的线程的发布者和订阅模拟并行处理管线。

  • @ca9163d9 Rx 在其核心场景——事件流处理中非常有用。在 Java、JavaScript 等其他语言中,它用于模拟数据流和异步处理,但 C# 已经为此提供了更好的构造。您想统计请求或失败的数量吗?您可以使用“Window”来收集窗口中的事件,“Count()”和“Where()”仅在失败计数超过限制时检索通知。在没有 Rx 的情况下,这需要大量的编码。 (2认同)