我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果.所以我可能还没有得到它.
我有一个源,它将数据发送到流中的速度快于订阅者可以使用它的速度.我更喜欢配置流,以便使用另一个线程为流中的每个新项调用订阅者,以便订阅者同时运行多个线程.我能够确保订户的线程安全性.
以下示例演示了此问题:
Observable.Interval( TimeSpan.FromSeconds(1))
.Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
Run Code Online (Sandbox Code Playgroud)
控制台输出如下所示(删除日期):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 …Run Code Online (Sandbox Code Playgroud)