从定期异步请求创建可观察对象

fig*_*ats 6 c# asynchronous system.reactive rx.net

我想要一种将异步方法转换为 observable 的通用方法。就我而言,我正在处理用于HttpClient从 API 获取数据的方法。

假设我们有一个方法Task<string> GetSomeData()需要成为一个单一的方法,Observable<string>其中的值是作为以下组合生成的:

  • 重复定期调用GetSomeData()(例如每 x 秒)
  • GetSomeData()在任何给定时间手动触发调用(例如当用户点击刷新时)。

由于有两种方法可以触发GetSomeData()并发执行可能是一个问题。为了避免要求GetSomeData()线程安全,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我做了一个(某种)大理石图来描述问题和想要的结果

大理石图

我的直觉告诉我有一个简单的方法可以实现这一点,所以请给我一些见解:)

这是我到目前为止的解决方案。不幸的是,它并没有解决并发问题。

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }
Run Code Online (Sandbox Code Playgroud)

延迟重复的扩展方法:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}
Run Code Online (Sandbox Code Playgroud)

包含生成 observable 方法的服务示例

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}
Run Code Online (Sandbox Code Playgroud)

像这样使用(会发生数据竞争):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)

The*_*ias 4

这是我对这个问题的看法:

\n
\n

更新:通过借鉴 Enigmativity 的答案,我能够大大简化我建议的解决方案。该Observable.StartAsync方法自动处理取消的混乱业务\xc2\xb9,并且可以简单地使用SemaphoreSlim.

\n
/// <summary>\n/// Creates an observable sequence containing the results of an asynchronous\n/// function that is invoked periodically and manually. Overlapping invocations\n/// are prevented. Timer ticks that would cause overlapping are ignored.\n/// Manual invocations cancel previous invocations, and restart the timer.\n/// </summary>\npublic static IObservable<T> PeriodicAndManual<T>(\n    Func<bool, CancellationToken, Task<T>> functionAsync,\n    TimeSpan period,\n    out Action manualInvocation)\n{\n    // Arguments validation omitted\n    var manualSubject = new Subject<bool>();\n    manualInvocation = () => manualSubject.OnNext(true);\n    return Observable.Defer(() =>\n    {\n        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping\n        return Observable\n            .Interval(period)\n            .Select(_ => false) // Not manual\n            .Merge(manualSubject)\n            .TakeUntil(isManual => isManual) // Stop on first manual\n            .Repeat() // ... and restart the timer\n            .Prepend(false) // Skip the initial interval delay\n            .Select(isManual =>\n            {\n                if (isManual)\n                {\n                    // Triggered manually\n                    return Observable.StartAsync(async ct =>\n                    {\n                        await semaphore.WaitAsync(ct);\n                        try { return await functionAsync(isManual, ct); }\n                        finally { semaphore.Release(); }\n                    });\n                }\n                else if (semaphore.Wait(0))\n                {\n                    // Triggered by the timer and semaphore acquired synchronously\n                    return Observable\n                        .StartAsync(ct => functionAsync(isManual, ct))\n                        .Finally(() => semaphore.Release());\n                }\n                return null; // Otherwise ignore the signal\n            })\n            .Where(op => op != null)\n            .Switch(); // Pending operations are unsubscribed and canceled\n    });\n}\n
Run Code Online (Sandbox Code Playgroud)\n

参数out Action manualInvocation是触发手动调用的机制。

\n

使用示例:

\n
int ticks = 0;\nvar subscription = PeriodicAndManual(async (isManual, token) =>\n{\n    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");\n    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");\n    await Task.Delay(500, token);\n    return id;\n}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)\n.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))\n.Subscribe();\n\nawait Task.Delay(3200);\nmanualInvocation();\nawait Task.Delay(200);\nmanualInvocation();\nawait Task.Delay(3200);\n\nsubscription.Dispose();\n
Run Code Online (Sandbox Code Playgroud)\n

输出:

\n
/// <summary>\n/// Creates an observable sequence containing the results of an asynchronous\n/// function that is invoked periodically and manually. Overlapping invocations\n/// are prevented. Timer ticks that would cause overlapping are ignored.\n/// Manual invocations cancel previous invocations, and restart the timer.\n/// </summary>\npublic static IObservable<T> PeriodicAndManual<T>(\n    Func<bool, CancellationToken, Task<T>> functionAsync,\n    TimeSpan period,\n    out Action manualInvocation)\n{\n    // Arguments validation omitted\n    var manualSubject = new Subject<bool>();\n    manualInvocation = () => manualSubject.OnNext(true);\n    return Observable.Defer(() =>\n    {\n        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping\n        return Observable\n            .Interval(period)\n            .Select(_ => false) // Not manual\n            .Merge(manualSubject)\n            .TakeUntil(isManual => isManual) // Stop on first manual\n            .Repeat() // ... and restart the timer\n            .Prepend(false) // Skip the initial interval delay\n            .Select(isManual =>\n            {\n                if (isManual)\n                {\n                    // Triggered manually\n                    return Observable.StartAsync(async ct =>\n                    {\n                        await semaphore.WaitAsync(ct);\n                        try { return await functionAsync(isManual, ct); }\n                        finally { semaphore.Release(); }\n                    });\n                }\n                else if (semaphore.Wait(0))\n                {\n                    // Triggered by the timer and semaphore acquired synchronously\n                    return Observable\n                        .StartAsync(ct => functionAsync(isManual, ct))\n                        .Finally(() => semaphore.Release());\n                }\n                return null; // Otherwise ignore the signal\n            })\n            .Where(op => op != null)\n            .Switch(); // Pending operations are unsubscribed and canceled\n    });\n}\n
Run Code Online (Sandbox Code Playgroud)\n

Scan使用和DistinctUntilChanged运算符在上一个异步操作运行时删除元素的技术是从这个问题借用的。

\n

\xc2\xb9看来 Rx 库并不能令人满意地处理这个混乱的业务,因为它只是省略了对其创建的 CancellationTokenSources 的处理

\n