Ome*_*Mor 25 c# linq system.reactive
我正在尝试建模一个不简单的Rx查询(对我而言):
每个人都有以下属性:
class Man
{
public const int LookingAtNobody = 0;
public int Id { get; set; }
public double Location { get; set; }
public int LookingAt { get; set; }
}
Run Code Online (Sandbox Code Playgroud)每个女人都有以下特性:
class Woman
{
public int Id { get; set; }
public double Location { get; set; }
}
Run Code Online (Sandbox Code Playgroud)代表我们拥有的男人IObservable<IObservable<Man>>,代表我们拥有的女人IObservable<IObservable<Woman>>.
你如何使用Rx生成从男人到他们正在看的女人的载体:IObservable<IObservable<Tuple<double,double>>>?
为了帮助,这里有一些针对一些简单情况的单元测试:
public class Tests : ReactiveTest
{
[Test]
public void Puzzle1()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnCompleted<Man>(300));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle2()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnCompleted<Man>(400));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(350),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle3()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
OnCompleted<Man>(400));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle4()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
OnCompleted<Man>(500));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var w2 = scheduler.CreateHotObservable(
OnNext(155, new Woman { Id = 20, Location = 100.0 }),
OnNext(255, new Woman { Id = 20, Location = 200.0 }),
OnNext(355, new Woman { Id = 20, Location = 300.0 }),
OnCompleted<Woman>(455));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
var expectedVector2 = new[]
{
OnNext(300, Tuple.Create(3.0, 200.0)),
OnNext(355, Tuple.Create(3.0, 300.0)),
OnNext(400, Tuple.Create(4.0, 300.0)),
OnCompleted<Tuple<double,double>>(455),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
}
private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
{
// assuming nested sequences are hot
var vectors =
from manDuration in men
join womanDuration in women on manDuration equals womanDuration
select from man in manDuration
join woman in womanDuration on manDuration equals womanDuration
where man.LookingAt == woman.Id
select Tuple.Create(man.Location, woman.Location);
var query = vectors.Select(vectorDuration =>
{
var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
vectorDuration.Subscribe(vectorResults);
return vectorResults.Messages;
});
var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
return results;
}
}
Run Code Online (Sandbox Code Playgroud)
(注意:这个问题被交叉发布到Rx论坛:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)
如果我理解正确的话,目标是创建 \xe2\x80\x9cfollow observables\xe2\x80\x9d 的可观察量,其中 \xe2\x80\x9cfollow observable\xe2\x80\x9d 当一个人开始观察时开始一个女人,当男人不再看女人时结束。\xe2\x80\x9cfollow observable\xe2\x80\x9d 应该由男人和女人最近位置的元组组成。
\n\n这里的想法是使用CombineLatest,它将接受两个可观察量,当它们中的任何一个产生一个值时,组合器将针对可观察量的两个最新值进行评估,从而在组合可观察量中产生一个值。但是,CombineLatest仅当两个可观察量都完成时才完成。在这种情况下,我们希望在两个源中的任何一个完成时完成可观察的内容。为此,我们定义以下扩展方法(我不相信这样的方法已经存在,但可能有更简单的解决方案):
public static IObservable<TSource>\n UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,\n IObservable<TWhile> lifetime)\n{\n return Observable.Create<TSource>(observer =>\n {\n var subscription = source.Subscribe(observer);\n var limiter = lifetime.Subscribe(next => { }, () =>\n {\n subscription.Dispose();\n observer.OnCompleted();\n });\n return new CompositeDisposable(subscription, limiter);\n });\n}\nRun Code Online (Sandbox Code Playgroud)\n\n此方法类似于TakeUntil,但它不是直到产生lifetime一个值,而是直到lifetime完成。我们还可以定义一个简单的扩展方法,它采用满足谓词的第一个条纹:
public static IObservable<TSource>\n Streak<TSource>(this IObservable<TSource> source,\n Func<TSource, bool> predicate)\n{\n return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);\n}\nRun Code Online (Sandbox Code Playgroud)\n\n现在,对于最终查询,我们使用 组合所有男性和所有女性CombineLatest,并使用 完成早期可观察的结果UntilCompleted。为了获得 \xe2\x80\x9cfollow observables\xe2\x80\x9d,我们选择男人看着女人的条纹。然后我们简单地将其映射到位置元组。
var vectors =\n from manDuration in men\n from womanDuration in women\n select manDuration\n .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })\n .UntilCompleted(womanDuration)\n .UntilCompleted(manDuration)\n .Streak(pair => pair.Man.LookingAt == pair.Woman.Id)\n .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));\nRun Code Online (Sandbox Code Playgroud)\n\n这通过了你所有的测试,但它不能处理这样的场景:一个男人看女人 10 一会儿,然后看 20 一会儿,然后又看 10 一会儿;仅使用第一个条纹。要观察所有条纹,我们可以使用以下扩展方法,该方法返回可观察的条纹:
\n\npublic static IObservable<IObservable<TSource>>\n Streaks<TSource>(this IObservable<TSource> source,\n Func<TSource, bool> predicate)\n{\n return Observable.Create<IObservable<TSource>>(observer =>\n {\n ReplaySubject<TSource> subject = null;\n bool previous = false;\n return source.Subscribe(x =>\n {\n bool current = predicate(x);\n if (!previous && current)\n {\n subject = new ReplaySubject<TSource>();\n observer.OnNext(subject);\n }\n if (previous && !current) subject.OnCompleted();\n if (current) subject.OnNext(x);\n previous = current;\n }, () =>\n {\n if (subject != null) subject.OnCompleted();\n observer.OnCompleted();\n });\n });\n}\nRun Code Online (Sandbox Code Playgroud)\n\n通过仅订阅源流一次并使用 a ReplaySubject,此方法适用于热和冷可观察量。现在对于最终查询,我们选择所有条纹,如下所示:
var vectors =\n from manDuration in men\n from womanDuration in women\n from streak in manDuration\n .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })\n .UntilCompleted(womanDuration)\n .UntilCompleted(manDuration)\n .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)\n select streak.Select(pair =>\n Tuple.Create(pair.Man.Location, pair.Woman.Location));\nRun Code Online (Sandbox Code Playgroud)\n