Rx - 在新线程上消耗每个项目

Luk*_*kas 4 c# multithreading system.reactive

假设我有这样的代码:

static void Main(string[] args)
    {
        var scheduler = NewThreadScheduler.Default;
        var enumerable = Enumerable.Range(0, 100);

        enumerable
            .ToObservable(scheduler)
            .SubscribeOn(scheduler)
            .Subscribe(item =>
            {
                Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);

                // simulate long running operation
                Thread.Sleep(1000);
            });

        Console.ReadKey();
    }
Run Code Online (Sandbox Code Playgroud)

正如您一样,我将 IEnumerable 转换为 IObservable。然后我想在新线程上使用每个项目,所以我使用了 SubsribeOn(scheduler)。不幸的是,每次迭代都在同一个线程上工作,因此下一次迭代会阻塞。

结果是:

Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....
Run Code Online (Sandbox Code Playgroud)

是否有可能强制这种行为?

Jam*_*rld 5

您所看到的行为完全是设计使然。

Rx 的基础是它的语法,它声明流被定义为零个或多个OnNext调用的序列,后跟可选的OnErroror OnCompleted

特别是,Rx 语法规定这些消息中的每一个都为给定的订阅者顺序传递。

所以你看到的是正确的行为 - 没有OnNext处理程序的并发执行。鉴于这种刻意的约束,为每个线程创建一个新线程OnNext将是非常浪费的。

在幕后,如果通过远远不够跟踪代码,你会看到,NewThreadScheduler利用一个EventLoopScheduler专门的再利用线程为每个用户。这个绰号NewThreadScheduler真正说明了每个订阅者获得一个新线程的事实,而不是每个事件。

要看到这一点,请修改您的代码,以便我们有两个以不同速度运行的订阅者。你会看到每个线程都有自己的线程,并按照自己的节奏进行,速度越慢不受阻碍:

var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);

var xs = enumerable
    .ToObservable(scheduler)
    .SubscribeOn(scheduler);

xs.Subscribe(item =>
{
    Console.WriteLine("Slow consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate slower long running operation
    Thread.Sleep(1000);
});

xs.Subscribe(item =>
{
    Console.WriteLine("Fast consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate faster long running operation
    Thread.Sleep(500);
});

Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)

您可能会发现通读Rx 设计指南非常有帮助。

允许对订阅者中的事件进行并发处理的愿望表明,您可能会追求具有多个消费者的队列 - 为此,您可以查看 Rx 之外的内容,例如 BCL ConcurrentQueue<T>。还可以在不违反 Rx 语法约束的情况下将消息投射到异步调用中并在完成时收集结果。

例如,这里有一些类似的代码,它在不同的时间长度内随机处理流中的每个数字。你可以看到结果是乱序的,相互之间没有阻碍。这不是很棒的代码,但它说明了这一点。如果异步工作是 IO 绑定的,它可能真的很有用。还要注意使用Observable.Rangewhich 避免使用Enumerable.Range().ToObservable()组合。在 .NET Core 2.0 上测试:

var random = new Random();

// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);

Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
    Console.WriteLine($"Started {x}");
    Thread.Sleep(random.Next(1, 10) * 1000);
    return x;
}))
.Subscribe(item =>
{
    Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});

Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)