小编foo*_*y42的帖子

Reactive Extensions:订阅者内的并发

我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果.所以我可能还没有得到它.

我有一个源,它将数据发送到流中的速度快于订阅者可以使用它的速度.我更喜欢配置流,以便使用另一个线程为流中的每个新项调用订阅者,以便订阅者同时运行多个线程.我能够确保订户的线程安全性.

以下示例演示了此问题:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });
Run Code Online (Sandbox Code Playgroud)

控制台输出如下所示(删除日期):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 …
Run Code Online (Sandbox Code Playgroud)

c# concurrency system.reactive

15
推荐指数
1
解决办法
3609
查看次数

标签 统计

c# ×1

concurrency ×1

system.reactive ×1