foo*_*y42 15 c# concurrency system.reactive
我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果.所以我可能还没有得到它.
我有一个源,它将数据发送到流中的速度快于订阅者可以使用它的速度.我更喜欢配置流,以便使用另一个线程为流中的每个新项调用订阅者,以便订阅者同时运行多个线程.我能够确保订户的线程安全性.
以下示例演示了此问题:
Observable.Interval( TimeSpan.FromSeconds(1))
.Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
Run Code Online (Sandbox Code Playgroud)
控制台输出如下所示(删除日期):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6
Run Code Online (Sandbox Code Playgroud)
请注意"观察值"时间增量.即使源继续以比订户可以处理它更快的速度发送数据,也不会并行调用订户.虽然我可以想象当前行为有用的一系列场景,但我需要能够在消息可用时立即处理它们.
我已经使用ObserveOn方法尝试了几种Scheduler变体,但它们似乎都没有做我想要的.
除了在Subscribe动作中分离一个线程以执行长时间运行的工作之外,还有什么我遗漏的东西可以同时向订阅者传送数据吗?
提前感谢所有的答案和建议!
Jam*_*rld 17
这里的根本问题是你希望Rx observable以一种真正违反可观察量工作规则的方式调度事件.我认为在这里查看Rx设计指南是有益的:http://go.microsoft.com/fwlink/?LinkID = 205219 - 最值得注意的是,"4.2假设观察者实例以序列化方式调用".即你并不意味着能够并行运行OnNext调用.事实上,Rx的排序行为对其设计理念至关重要.
如果查看源代码,您将看到Rx在派生的ScheduledObserver<T>类中强制执行此行为ObserveOnObserver<T>... OnNexts从内部队列调度,每个必须在调度下一个之前完成 - 在给定的执行上下文中.Rx不允许单个订户的OnNext调用同时执行.
这并不是说你不能让多个子纤维以不同的速率执行.实际上,如果您按如下方式更改代码,则很容易看到:
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default);
var subscription1 = source.Subscribe(x =>
{
Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(1000); // Simulate long work time
});
var subscription2 = source.Subscribe(x =>
{
Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
Run Code Online (Sandbox Code Playgroud)
现在,您将看到订阅服务器1超越订阅服务器2.
你不能轻易做的就是让一个观察者做一些事情,比如向一个"准备好"的用户发送一个OnNext呼叫 - 这就是你要求的迂回方式.我还假设你不想在消费者缓慢的情况下为每个OnNext创建一个新线程!
在这种情况下,听起来你最好只使用一个订阅者,除了尽可能快地将工作推送到队列之外什么也不做,然后由许多消耗的工作线程提供服务,然后你可以根据需要进行控制步伐.
| 归档时间: |
|
| 查看次数: |
3609 次 |
| 最近记录: |