Mat*_*tin 1 c# concurrency observablecollection parallel-extensions system.reactive
我一直在尝试使用Rx和可观察集合实现一个简单的生产者 - 消费者模式.我还需要能够轻松限制用户数量.我在并行扩展中看到过很多对LimitedConcurrencyLevelTaskScheduler的引用,但我似乎无法使用多个线程.
我想我做的很傻,所以我希望有人可以解释一下.在下面的单元测试中,我希望使用多个(2)线程来使用阻塞集合中的字符串.我究竟做错了什么?
[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();
[TestMethod]
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
// Setup the command queue for processing combinations
var commandQueue = new BlockingCollection<string>();
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });
var iterationCount = 100;
for (int i = 0; i < iterationCount; i++)
{
commandQueue.Add(string.Format("string {0}", i));
}
commandQueue.CompleteAdding();
while (!commandQueue.IsCompleted)
{
Thread.Sleep(100);
}
Assert.AreEqual(iterationCount, _testStrings.Count);
Assert.AreEqual(2, _threadIds.Distinct().Count());
}
private void Go(string testString)
{
_testStrings.Add(testString);
_threadIds.Add(Thread.CurrentThread.ManagedThreadId);
}
}
Run Code Online (Sandbox Code Playgroud)
每个人似乎都在与Rx一起学习相同的学习曲线.要理解的是,除非您明确地生成强制并行性的查询,否则Rx不会执行并行处理.调度程序不会引入并行性.
Rx有一个行为契约,表示串联产生零个或多个值(无论可能使用多少个线程),一个接一个,没有重叠,最后是可选的单个错误或一个完整的消息,然后别的什么.
这通常写成OnNext*(OnError|OnCompleted).
所有调度程序都会定义规则,以确定如果调度程序没有正在处理当前observable的挂起值,则处理新值的线程.
现在拿你的代码:
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
Run Code Online (Sandbox Code Playgroud)
这表示调度程序将在两个线程之一上运行订阅的值.但这并不意味着它会为所产生的每一个价值做到这一点.请记住,由于值是一个接一个地串行生成的,因此最好重新使用现有线程而不是创建新线程的高成本.因此,如果在处理完当前值之前在调度程序上调度了新值,则Rx所做的是重用现有线程.
这是关键 - 如果在完成现有值的处理之前安排了新值,它将重新使用该线程.
所以你的代码执行此操作:
commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });
Run Code Online (Sandbox Code Playgroud)
这意味着调度程序只会在第一个值出现时创建一个线程.但是当昂贵的线程创建操作完成时,那么添加值的代码commandQueue也已完成,因此它已将它们全部排队,因此它可以更有效地使用单个线程而不是创建昂贵的第二个线程.
为避免这种情况,您需要构造查询以引入并行性.
这是如何做:
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
var iterationCount = 100;
Observable
.Range(0, iterationCount)
.SelectMany(n => Observable.Start(() => n.ToString(), scheduler)
.Do(x => Go(x)))
.Wait();
(iterationCount == _testStrings.Count).Dump();
(2 == _threadIds.Distinct().Count()).Dump();
}
Run Code Online (Sandbox Code Playgroud)
现在,我使用Do(...)/ .Wait()combo为您提供了一个阻塞.Subscribe(...)方法.
结果是你的断言都返回true.