如何在Rx.NET中以正确的方式约束并发

mar*_*ark 6 c# reactive-programming system.reactive

请观察以下代码段:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();
Run Code Online (Sandbox Code Playgroud)

这段代码的问题getResultAsync是以无约束的方式同时运行.在某些情况下,这可能不是我们想要的.假设我想将其并发限制为最多10个并发调用.什么是Rx.NET方法呢?

我附上一个简单的控制台应用程序,演示了所描述问题的主题和我的蹩脚解决方案.

还有一些额外的代码,比如Stats类和人工随机睡眠.它们用于确保我真正获得并发执行,并且可以可靠地计算在此过程中达到的最大并发性.

该方法RunUnconstrained展示了天真的,无约束的运行.该方法RunConstrained显示了我的解决方案,这不是很优雅.理想情况下,我想通过简单地将专用Rx运算符应用于Monad来简化约束并发性.当然,不会牺牲性能.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace RxConstrainedConcurrency
{
    class Program
    {
        public class Stats
        {
            public int MaxConcurrentCount;
            public int CurConcurrentCount;
            public readonly object MaxConcurrentCountGuard = new object();
        }

        static void Main()
        {
            RunUnconstrained().GetAwaiter().GetResult();
            RunConstrained().GetAwaiter().GetResult();
        }
        static async Task RunUnconstrained()
        {
            await Run(AsyncOp);
        }
        static async Task RunConstrained()
        {
            using (var sem = new SemaphoreSlim(10))
            {
                await Run(async (s, pause, stats) =>
                {
                    // ReSharper disable AccessToDisposedClosure
                    await sem.WaitAsync();
                    try
                    {
                        return await AsyncOp(s, pause, stats);
                    }
                    finally
                    {
                        sem.Release();
                    }
                    // ReSharper restore AccessToDisposedClosure
                });
            }
        }
        static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
        {
            var stats = new Stats();
            var rnd = new Random(0x1234);
            var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
            Debug.Assert(stats.CurConcurrentCount == 0);
            Debug.Assert(result.Count == 1000);
            Debug.Assert(!result.Contains(0));
            Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
        }

        static IObservable<string> GetSource(int count)
        {
            return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
        }

        static Task<int> AsyncOp(string s, int pause, Stats stats)
        {
            return Task.Run(() =>
            {
                int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
                if (stats.MaxConcurrentCount < cur)
                {
                    lock (stats.MaxConcurrentCountGuard)
                    {
                        if (stats.MaxConcurrentCount < cur)
                        {
                            stats.MaxConcurrentCount = cur;
                        }
                    }
                }

                try
                {
                    Thread.Sleep(pause);
                    return int.Parse(s);
                }
                finally
                {
                    Interlocked.Decrement(ref stats.CurConcurrentCount);
                }
            });
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 10

您可以使用Merge限制内部可观察量的并发订阅数量的重载在Rx中执行此操作.

这种形式Merge适用于流的流.

通常,使用SelectMany从事件调用异步任务会执行两个作业:它将每个事件投影到一个可观察的流中,其单个事件就是结果,并将所有生成的流展平在一起.

要使用,Merge我们必须使用常规Select将每个事件投影到异步任务的调用中(从而创建流的流),并用于Merge展平结果.它将通过在任何时间点仅订阅所提供的固定数量的内部流来以受约束的方式执行此操作.

我们必须小心,只有在订阅包装内部流时才调用每个异步任务调用.将异步任务转换为observable ToObservable()实际上会立即调用异步任务,而不是订阅,因此我们必须将评估推迟到订阅使用Observable.Defer.

以下是将所有这些步骤放在一起的示例:

void Main()
{
    var xs = Observable.Range(0, 10); // source events

    // "Double" here is our async operation to be constrained,
    // in this case to 3 concurrent invocations

    xs.Select(x =>
       Observable.Defer(() => Double(x).ToObservable())).Merge(3)
      .Subscribe(Console.WriteLine,
                 () => Console.WriteLine("Max: " + MaxConcurrent));


}

private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();

public async Task<int> Double(int x)
{
    var concurrent = Interlocked.Increment(ref Concurrent);
    lock(gate)
    {
        MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
    }

    await Task.Delay(TimeSpan.FromSeconds(1));

    Interlocked.Decrement(ref Concurrent);

    return x * 2;
}
Run Code Online (Sandbox Code Playgroud)

这里的最大并发输出将是"3".删除合并以"不受约束",你将获得"10".

获得Defer读取效果更好的效果的另一种(等效)方法是使用FromAsync而不是Defer+ ToObservable:

xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)
Run Code Online (Sandbox Code Playgroud)