如何在 Rx.Net 中实现排放映射处理程序?

wh1*_*t1k 5 .net c# system.reactive rx.net

我正在寻找类似于exhaustMapfrom 运算符的东西rxjs,但RX.NET似乎没有这样的运算符。

我需要实现的是,在源流的每个元素上,我需要启动一个async处理程序,在它完成之前,我想从源中删除任何元素。处理程序完成后,立即恢复获取元素。

我不想在每个元素上启动一个异步处理程序——当处理程序运行时,我想删除源元素。

我还怀疑我需要在这里巧妙地使用 defer 运算符?

谢谢!

The*_*ias 9

这是ExhaustMap运算符的实现。源 observable 被投影到IObservable<Task<TResult>>,其中每个后续任务要么是前一个,如果它仍在运行,要么是与当前项目相关联的新任务。然后用DistinctUntilChanged操作符删除重复出现的相同任务,最后用操作符将 observable 展平Concat

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
{
    return source
        .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
        {
            return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
        })
        .DistinctUntilChanged()
        .Concat();

    async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
Run Code Online (Sandbox Code Playgroud)

function不保证返回的任务是不同的,因此需要HideIdentity返回任务的不同包装器的本地函数。

用法示例:

Observable
    .Interval(TimeSpan.FromMilliseconds(200))
    .Select(x => (int)x + 1)
    .Take(10)
    .Do(x => Console.WriteLine($"Input: {x}"))
    .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
    .Do(x => Console.WriteLine($"Result: {x}"))
    .Wait();
Run Code Online (Sandbox Code Playgroud)

输出:

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
{
    return source
        .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
        {
            return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
        })
        .DistinctUntilChanged()
        .Concat();

    async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
Run Code Online (Sandbox Code Playgroud)

更新:这是一个替代实现,其中function产生一个IObservable<TResult>而不是一个Task<TResult>

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> function)
{
    return Observable.Using(() => new SemaphoreSlim(1, 1),
        semaphore => source.SelectMany(item => ProjectItem(item, semaphore)));

    IObservable<TResult> ProjectItem(TSource item, SemaphoreSlim semaphore)
    {
        // Attempt to acquire the semaphore immediately. If successful, return
        // a sequence that releases the semaphore when terminated. Otherwise,
        // return immediately an empty sequence.
        return Observable.If(() => semaphore.Wait(0),
            Observable
                .Defer(() => function(item))
                .Finally(() => semaphore.Release())
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

这个很有趣,因为如果需要,它可以很容易地修改并行度。只需SemaphoreSlim使用initialCount不同于 1的参数实例化。

  • 不错,西奥多!我已经离开 rx 标签有一段时间了 - 很高兴看到你在坚守堡垒。 (2认同)