将IObservable <Task <T >>解包到IObservable <T>中

yal*_*lie 5 c# task-parallel-library system.reactive .net-4.5 rx.net

有没有办法解开的IObservable<Task<T>>进入IObservable<T>保持同样的事件顺序,这样的吗?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
Run Code Online (Sandbox Code Playgroud)

假设我有一个消耗流消息的桌面应用程序,其中一些需要大量的后处理:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
Run Code Online (Sandbox Code Playgroud)

我想象有两种处理方式.

首先,我可以订阅streamOfTasks使用异步事件处理程序:

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});
Run Code Online (Sandbox Code Playgroud)

其次,我可以转换streamOfTasks使用Observable.Create,像这样:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));
Run Code Online (Sandbox Code Playgroud)

无论哪种方式,都不会保留消息的顺序:一些不需要任何后处理的后续消息比需要后处理的早期消息更快.我的解决方案都是并行处理传入的消息,但我希望它们能够按顺序逐个处理.

我可以编写一个简单的任务队列来一次处理一个任务,但也许这有点过分.在我看来,我错过了一些明显的东西.


UPD.我写了一个示例控制台程序来演示我的方法.到目前为止,所有解决方案都不保留事件的原始顺序.这是程序的输出:

Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0
Run Code Online (Sandbox Code Playgroud)

这是完整的源代码:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}
Run Code Online (Sandbox Code Playgroud)

yal*_*lie 2

将@Enigmativity的简单方法与@VMAtm附加计数器的想法以及这个SO问题中的一些代码片段相结合,我想出了这个解决方案:

// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));

processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));

// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....
Run Code Online (Sandbox Code Playgroud)

这是我的SelectAsync扩展方法,用于转换IObservable<Task<TSource>>IObservable<TResult>保持事件的原始顺序:

public static IObservable<TResult> SelectAsync<TSource, TResult>(
    this IObservable<TSource> src,
    Func<TSource, Task<TResult>> selectorAsync)
{
    // using local variable for counter is easier than src.Scan(...)
    var counter = 0;
    var streamOfTasks =
        from source in src
        from result in Observable.FromAsync(async () => new
        {
            Index = Interlocked.Increment(ref counter) - 1,
            Result = await selectorAsync(source)
        })
        select result;

    // buffer the results coming out of order
    return Observable.Create<TResult>(observer =>
    {
        var index = 0;
        var buffer = new Dictionary<int, TResult>();

        return streamOfTasks.Subscribe(item =>
        {
            buffer.Add(item.Index, item.Result);

            TResult result;
            while (buffer.TryGetValue(index, out result))
            {
                buffer.Remove(index);
                observer.OnNext(result);
                index++;
            }
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

我对我的解决方案不是特别满意,因为它对我来说看起来太复杂,但至少它不需要任何外部依赖项。我在这里使用一个简单的字典来缓冲和重新排序任务结果,因为订阅者不需要是线程安全的(订阅不必同时调用)。

欢迎任何意见或建议。我仍然希望找到无需自定义缓冲扩展方法的原生 RX 方法。