如何使用Reactive限制消耗顺序?

joh*_*y g 4 .net c# multithreading reactive-programming system.reactive

我们有一个应用程序,其中我们有一个物化的项目数组,我们将通过Reactive管道进行处理.它看起来有点像这样

EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
    new TaskFactory(
        new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

// 1. transform on single thread
IConnectableObservable<byte[]> source = 
    numbers.Select(Transform).ToObservable(eventLoop).Publish();

// 2. naive parallelization, restricts parallelization to Work 
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
    Buffer(10).
    Select(
        batch =>
        batch.
        ToObservable(concurrency).
        Buffer(10).
        Select(
            concurrentBatch =>
            concurrentBatch.
            Select(Work).
            ToArray().
            ToObservable(eventLoop)).
        Merge()).
    Merge();

final.Subscribe();

source.Connect();
Await(final).Wait();
Run Code Online (Sandbox Code Playgroud)

如果你真的很想玩这个,那么替身方法就像

private async static Task Await(IObservable<int> final)
{
    await final.LastOrDefaultAsync();
}

private static byte[] Transform(int number)
{
    if (number == itemCount)
    {
        Console.WriteLine("numbers exhausted.");
    }
    byte[] buffer = new byte[1000000];
    Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
    return buffer;
}

private static int Work(byte[] buffer)
{
    Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
    return 1;
}
Run Code Online (Sandbox Code Playgroud)

一点解释.Range(1, itemCount)模拟从数据源实现的原始输入.Transform模拟每个输入必须经过的浓缩过程,并导致更大的内存占用.Work是一个"冗长"的过程,它对转换后的输入进行操作.

理想情况下,我们希望最小化系统同时保持的转换输入的数量,同时通过并行化最大化吞吐量Work.内存中已转换输入的数量应为批量大小(10上图)乘以并发工作线程(threadCount).

因此,对于5个线程,我们应该Transform在任何给定时间保留50个项目; 如果这里的变换是一个1MB字节的缓冲区,那么我们预计在整个运行过程中内存消耗大约为50MB.

我发现的是完全不同的.也就是说,Reactive正在急切地消耗所有这些numbers,并且Transform它们在前面(由numbers exhausted.消息证明),导致前面的大量内存峰值(@ 1GB为1000 itemCount).

我的基本问题是:有没有办法实现我所需要的(即最小化消耗,受多线程批处理限制)?

更新:抱歉逆转詹姆斯; 起初,我没有想到paulpdaniels和Enigmativity的Work(Transform)应用组合(这与我们实际实现的性质有关,这比上面提供的简单场景更复杂),然而,经过一些进一步的实验,我可能能够应用相同的原则:即延迟转换直到批处理执行.

Eni*_*ity 7

你的代码犯了一些错误,导致你得出所有的结论.

首先,你做到了这一点:

IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
Run Code Online (Sandbox Code Playgroud)

你已经使用了Enumerable.Range这意味着,当你打电话时,你numbers.Select(Transform)numbers尽可能快地烧掉所有的线程.Rx甚至没有机会做任何工作,因为到目前为止你的管道完全可以枚举.

下一个问题在您的订阅中:

final.Subscribe();

source.Connect();
Await(final).Wait();
Run Code Online (Sandbox Code Playgroud)

因为您调用final.Subscribe()&Await(final).Wait();您正在为finalobservable 创建两个单独的订阅.

由于source.Connect()中间存在第二个订阅可能错过了值.

所以,让我们尝试删除所有正在发生的事情,看看我们是否可以解决问题.

如果你谈到这个:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .Select(bs => Work(bs));
Run Code Online (Sandbox Code Playgroud)

事情很顺利.最后这些数字已经耗尽,在我的机器上处理20个项目大约需要1秒钟.

但这是按顺序处理所有事情.并且该Work步骤提供背压Transform以减慢其消耗数字的速度.

让我们添加并发性.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs)));
Run Code Online (Sandbox Code Playgroud)

这在0.284秒内处理20个项目,并且在处理5个项目之后数字自行耗尽.这些数字不再有任何背压.基本上,调度程序正在将所有工作交给Observable.Start它,因此它可以立即为下一个数字做好准备.

让我们减少并发性.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs), concurrency));
Run Code Online (Sandbox Code Playgroud)

现在,20个项目在0.5秒内得到处理.在数字耗尽之前只有两个被处理.这是有道理的,因为我们将并发性限制为两个线程.但仍然没有对数字消费的背压,所以他们很快就被嚼起来了.

说完所有这些之后,我尝试用适当的背压构建一个查询,但我找不到方法.问题归结为这样一个事实,即它的Transform(...)表现要快得多,Work(...)所以它的完成速度要快得多.

那么对我来说显而易见的举动是这样的:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));
Run Code Online (Sandbox Code Playgroud)

这直到最后才完成数字,并且它将处理限制为两个线程.它看起来对你想要的东西是正确的,除了我必须一起做Work(Transform(...)).


Jam*_*rld 5

事实上,你想要限制你正在做的工作量,这表明你应该提取数据,而不是把它推向你.我会忘记在这种情况下使用Rx,从根本上说,你所描述的并不是一个被动的应用程序.此外,Rx最适合连续处理物品; 它使用顺序事件流.

为什么不保持数据源可枚举,并使用PLinq,Parallel.ForEachDataFlow?所有这些听起来更适合您的问题.

  • Rx主要是一种查询可观察事件序列的方法.OP在这里有一组静态数据,他们希望以并行方式处理(我假设最大吞吐量).在这种情况下,我完全同意@JamesWorld,这是一个最适合TPL/PLinq的问题 (3认同)
  • 是的,这绝对是因为来源的本质 - 我应该更清楚(感谢@LeeCampbell,因为我认为你的评论非常清楚地阐明了这一点).但是,我会更进一步说,引入持续背压的任何问题都表明Rx不合适 - 同样任何限制Rx管道吞吐量的解决方案都会导致背压持续增加也表明Rx不合适.总而言之,就像Enigmativity当前接受的答案一样优雅,我担心它会把人们推向错误的道路. (3认同)
  • 我同意PLinq或Parallel.ForEach在这种情况下可能是更好的选择,但这不是因为源的性质而不是操作的性质吗?如果源实际上是一个以突发方式触发事件的WebSocket,你仍然会说它不是一个Rx应用程序吗? (2认同)