len*_*kan 5 .net c# system.reactive
基本上,我有一个可观察的输入字符串,我想单独处理,然后对结果做一些事情.如果输入字符串包含逗号(作为分隔符),我想分割字符串并单独处理每个子字符串,然后对每个字符串序列执行某些操作.下面的代码段说明了我尝试做的简化版本:
[Fact]
public void UniTest1()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.SelectMany(x => x.Split(','))
.Select(x => x.ToUpper())
.ToArray() // How to collect an IEnumerable for each item here?
.Do(s => result.Add(s))
.Subscribe();
// Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
Run Code Online (Sandbox Code Playgroud)
正如评论中所解释的,上述方法不起作用..ToArray() - 调用将整个observable连接成一个序列.
但是,我已经通过将拆分和处理放入单个操作来解决这个问题,如下所示:
[Fact]
public void UniTest2()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray())
.Do(s => result.Add(s))
.Subscribe();
// Result is as expected: {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
Run Code Online (Sandbox Code Playgroud)
但有没有办法,使用Rx,通过不将分裂和处理放在同一个动作中来解决这个问题?这个问题的推荐解决方案是什么?
我还要提一下,处理,即ToUpper() - 调用,实际上是一个Web服务调用.我在我的例子中使用了ToUpper(),这样我的问题就很容易解释了.但这意味着我希望这种处理能够并行完成并且不会阻塞.
您最终在代码中提出了许多值得一提的事情。
首先,该.ToArray()运算符采用返回零个或多个单个值的可观察量,并将其更改为返回零个或多个值的单个数组的可观察量。这样的可观察量必须先完成,然后才能返回其唯一的值。
考虑到这一点,第一个查询的结果应该是有意义的。
您的第二个查询会产生x.Split(',').Select(s => s.ToUpper()).ToArray()您想要的输出,但您想知道“是否有一种方法,使用 RX,通过不将拆分和处理放在同一操作中来解决此问题”。
好吧,简单地说,是的:
var result = new List<string[]>();
observable
.Select(x => x.Split(','))
.Select(x => x.Select(s => s.ToUpper()))
.Select(x => x.ToArray())
.Do(s => result.Add(s))
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
但是,这不会并行处理这些项目。Rx 被设计为串行工作,除非您调用引入并行性的操作。
通常,一种简单的方法是进行长时间运行的选择,例如.Select(x => longRunningOperation(x))并用它来执行此操作:
.SelectMany(x => Observable.Start(() => longRunningOperation(x)))
Run Code Online (Sandbox Code Playgroud)
对于您的情况,您可以这样做:
observable
.ObserveOn(Scheduler.Default)
.SelectMany(x => Observable.Start(() => x.Split(',')))
.SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper())))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => result.Add(s))
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
但这只是并行化每个原始.OnNext调用,而不是其中的处理。为此,您需要将结果转换x.Split(',')为可观察的结果,并并行处理它。
observable
.SelectMany(x => Observable.Start(() => x.Split(',').ToObservable()))
.SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper()))))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => s.Do(t => result.Add(t)))
.Merge()
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
但这开始看起来很疯狂,它不再在当前线程上运行,这意味着您的测试不会等待结果。
让我们重新看看这个查询。
我已经开始挂断电话了.Do。这些通常有利于调试,但对于任何状态更改来说它们都是不好的。它们可以在查询中的任何线程上的任何点运行,因此您需要确保调用中的代码.Do是线程安全的,并且调用result.Add(s)不是线程安全的。
我还引入了一个“webservice”调用来替换.ToUpper()一秒的处理延迟,以便我们可以看到查询需要多长时间来处理,从而知道它是否并行运行。如果最终查询需要 5 秒才能运行,则不存在并行性,如果少于 5 秒,则我们获胜。
因此,如果我以最基本的方式编写查询,它看起来像这样:
Func<string, string> webservice = x =>
{
Thread.Sleep(1000);
return x.ToUpper();
};
var query =
observable
.Select(ls =>
from p in ls.Split(',')
select webservice(p))
.Select(rs => rs.ToArray())
.ToArray()
.Select(rss => new List<string[]>(rss));
var sw = Stopwatch.StartNew();
List<string[]> result = query.Wait();
sw.Stop();
Run Code Online (Sandbox Code Playgroud)
当我运行这个程序时,我得到了预期的结果{{"A","B"},{"C","D","E"}},但只需要 5 秒多一点就可以完成。这里没有预期的并行性。
现在让我们介绍一些并行性:
var query =
observable
.Select(ls =>
from p in ls.Split(',').ToObservable()
from r in Observable.Start(() => webservice(p))
select r)
.Select(rs => rs.ToArray())
.Merge()
.ToArray()
.Select(rss => new List<string[]>(rss));
Run Code Online (Sandbox Code Playgroud)
我基本上应用了上面描述的“ Selectto SelectMany/ ”模式。Start唯一棘手的部分是,它.Select(rs => rs.ToArray())从一个变成了IObservable<string[]>一个IObservable<IObservable<string[]>>,所以我突然把.Merge()它压平了。当您将并行性引入 Rx 查询时,这是正常的。
现在,当我运行查询时 - BOOM - 仅一秒多一点。所有五个输入都并行运行。现在唯一的问题是顺序不再是决定性的。但当结果并行执行时,你就无能为力了。
一次这样的运行我得到了这个结果:
如果我将此作为测试来运行,我会将结果按已知顺序排序,并将其与预期结果进行比较。
| 归档时间: |
|
| 查看次数: |
628 次 |
| 最近记录: |