NewThreadScheduler.Default计划在同一个线程上的所有工作

And*_*rpi 8 c# concurrency system.reactive

我目前正试图用RX .NET来解决并发问题,并对某些事情感到困惑.我想并行运行四个相对较慢的任务,所以我认为NewThreadScheduler.Default这是要走的路,因为它"代表一个在一个单独的线程上调度每个工作单元的对象"..

这是我的设置代码:

    static void Test()
    {
        Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId);

        var query = Enumerable.Range(1, 4);
        var obsQuery = query.ToObservable(NewThreadScheduler.Default);
        obsQuery.Subscribe(DoWork, Done);

        Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

    static void DoWork(int i)
    {
        Thread.Sleep(500);
        Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    }

    static void Done()
    {
        Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }
Run Code Online (Sandbox Code Playgroud)

我假设"X线程Y"每次都会输出不同的线程ID,但实际输出是:

Starting. Thread 1
Last line. Thread 1
1 Thread 3
2 Thread 3
3 Thread 3
4 Thread 3
Done. Thread 3
Run Code Online (Sandbox Code Playgroud)

所有工作都按顺序在同一个新线程上进行,这不是我所期待的.

我假设我错过了什么,但我无法弄清楚是什么.

Bry*_*son 10

可观察查询有两个部分,即Query自身和Subscription.(这也是ObserveOn和SubscribeOn运算符之间的区别.)

Query

Enumerable
    .Range(1, 4)
    .ToObservable(NewThreadScheduler.Default);
Run Code Online (Sandbox Code Playgroud)

这将创建一个observable,为该系统的默认值生成值NewThreadScheduler.

您的订阅是

obsQuery.Subscribe(DoWork, Done);
Run Code Online (Sandbox Code Playgroud)

这将DoWork针对每个生成的值运行,QueryDoneQuery完成OnComplete调用时生成.我不认为有什么保证会调用subscribe方法中的函数,实际上如果查询的所有值都是在运行订阅的线程的同一个线程上产生的.它们似乎也在制作它所以所有的订阅调用都是在同一个线程上完成的,这很可能是为了摆脱许多常见的多线程错误.

所以你有两个问题,一个是你的日志记录,如果你改变你Query

Enumerable
    .Range(1, 4)
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
    .ToObservable(NewThreadScheduler.Default);
Run Code Online (Sandbox Code Playgroud)

您将看到在新线程上生成的每个值.

另一个问题是Rx的意图和设计之一.它的目的是,Query为长期运行的过程和Subscription是与结果有关系的短期方法.如果要将长时间运行的函数作为Rx Observable运行,最好的选择是使用Observable.ToAsync.