Cri*_*scu 0 c# parallel-processing system.reactive
我有以下代码片段,它枚举了某些xml的元素(从svn log --xml ...进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法.
var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
var xElements = xml.Descendants("path")
.ToObservable()
//.SubscribeOn(ThreadPoolScheduler.Instance)
.Select(descendant => return LongRunning(descendant));
xElements
//.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(result => Console.WriteLine(result);
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
该LongRunning方法并不重要,但在其中我记录了它运行的线程.让我们假设它运行一整秒.
我的问题是,取消评论任何SubscribeOn()一行都没有任何效果.调用LongRunning是顺序的,每隔一秒发生一次,在同一个线程上(尽管与主(初始)线程不同).
这是一个控制台应用程序.
我是Rx的新手.我错过了什么?
编辑:
在尝试了Lee Campbell的回答之后,我注意到了另一个问题.
Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);
var xElements = xml.Descendants("path").ToObservable()
//.ObserveOn(Scheduler.CurrentThread)
.SelectMany(descendant =>
Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
.Subscribe(result => Console.WriteLine(
"Result on: " + Thread.CurrentThread.ManagedThreadId));
[...]
string LongRunning(XElement el)
{
Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
DoWork();
Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
return "something";
}
Run Code Online (Sandbox Code Playgroud)
这给出了以下输出:
Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...
Run Code Online (Sandbox Code Playgroud)
我需要的是一种将结果"排队"到同一个线程的方法.我认为这ObserveOn()是为了什么,但是对ObserveOn()上述行不予评论并不会改变结果.
首先,Rx是用于控制异步的库(或范例),特别是可观察的序列.你在这里有一个可枚举的序列(Xml Descendants)和一个阻塞/同步LongRunning方法调用.
通过调用ToObservable()你的可枚举序列,你实际上只是遵守接口,但是当你的序列被实现时(急切而不是懒惰),没有什么真正的Observable/Async.
通过调用SubscribeOn,您有正确的想法,但转换已经在ToObservable()运营商中完成.你可能打算调用,ToObservable(ThreadPoolScheduler.Instance)以便IEnumerable可以在另一个线程上完成任何缓慢的迭代.但是......我认为这不会是一个缓慢的迭代器,所以这可能无法解决任何问题.
您最想要做的事情(如果Rx是此类问题的最佳工具,这是可疑的)是安排对LongRunning方法的调用.但是,这意味着您需要将Asyncrony添加到您的选择中.一个很好的方法是使用Rx Factory方法之一Observable.FromAsync或者Observable.Start.然而,这将使您的序列成为一个IObservable<IObservable<T>>.您可以使用SelectMany或来展平它Merge.
说完这一切之后,我想你想做的是:
var proc = Process.Start(avnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
//EDIT: Added ELS to serialise results onto a single thread.
var els = new EventLoopScheduler(threadStart=>new Thread(threadStart)
{
IsBackground=true,
Name="MyEventLoopSchedulerThread"
});
var xElements = xml.Descendants("path").ToObservable()
.SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance))
.ObserveOn(els)
.Subscribe(result => Console.WriteLine(result));
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1735 次 |
| 最近记录: |