Luk*_*kas 4 c# multithreading system.reactive
假设我有这样的代码:
static void Main(string[] args)
{
var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler)
.Subscribe(item =>
{
Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
// simulate long running operation
Thread.Sleep(1000);
});
Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)
正如您一样,我将 IEnumerable 转换为 IObservable。然后我想在新线程上使用每个项目,所以我使用了 SubsribeOn(scheduler)。不幸的是,每次迭代都在同一个线程上工作,因此下一次迭代会阻塞。
结果是:
Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....
Run Code Online (Sandbox Code Playgroud)
是否有可能强制这种行为?
您所看到的行为完全是设计使然。
Rx 的基础是它的语法,它声明流被定义为零个或多个OnNext调用的序列,后跟可选的OnErroror OnCompleted。
特别是,Rx 语法规定这些消息中的每一个都为给定的订阅者顺序传递。
所以你看到的是正确的行为 - 没有OnNext处理程序的并发执行。鉴于这种刻意的约束,为每个线程创建一个新线程OnNext将是非常浪费的。
在幕后,如果通过远远不够跟踪代码,你会看到,NewThreadScheduler利用一个EventLoopScheduler专门的再利用线程为每个用户。这个绰号NewThreadScheduler真正说明了每个订阅者获得一个新线程的事实,而不是每个事件。
要看到这一点,请修改您的代码,以便我们有两个以不同速度运行的订阅者。你会看到每个线程都有自己的线程,并按照自己的节奏进行,速度越慢不受阻碍:
var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
var xs = enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler);
xs.Subscribe(item =>
{
Console.WriteLine("Slow consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate slower long running operation
Thread.Sleep(1000);
});
xs.Subscribe(item =>
{
Console.WriteLine("Fast consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate faster long running operation
Thread.Sleep(500);
});
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
您可能会发现通读Rx 设计指南非常有帮助。
允许对订阅者中的事件进行并发处理的愿望表明,您可能会追求具有多个消费者的队列 - 为此,您可以查看 Rx 之外的内容,例如 BCL ConcurrentQueue<T>。还可以在不违反 Rx 语法约束的情况下将消息投射到异步调用中并在完成时收集结果。
例如,这里有一些类似的代码,它在不同的时间长度内随机处理流中的每个数字。你可以看到结果是乱序的,相互之间没有阻碍。这不是很棒的代码,但它说明了这一点。如果异步工作是 IO 绑定的,它可能真的很有用。还要注意使用Observable.Rangewhich 避免使用Enumerable.Range().ToObservable()组合。在 .NET Core 2.0 上测试:
var random = new Random();
// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);
Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
Console.WriteLine($"Started {x}");
Thread.Sleep(random.Next(1, 10) * 1000);
return x;
}))
.Subscribe(item =>
{
Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1128 次 |
| 最近记录: |