Den*_*voy 3 c# system.reactive
每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:
var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
subj.Finally(obs.OnCompleted);
return subj.Subscribe(e =>
{
try
{
Preprocess(e);
obs.OnNext(e);
Postprocess(e);
}
catch (Exception ex) { obs.OnError(ex); }
});
});
Run Code Online (Sandbox Code Playgroud)
我的问题:这是正确的方法,还是有更好的模板/扩展方法?
这里有一些微妙之处。
你的问题的标题是提到修改事件 - 但你真的不应该这样做。我假设该事件的处理前和处理后报告,但不要修改它。如果您需要这样做,最好将Do以下更改为Select并让 Preprocess 返回值的修改副本 - 这种不可变方法更加明确且不易出错。不清楚事件的后修改在可观察流中如何有意义 - 我会避免它并让订阅者报告观察。解决方案中没有任何内容在技术上阻止您改变处理方法中的事件 - 但是如果您正在改变并且有多个下游订阅者,则确实需要非常小心地处理行为。这种变异不是惯用的 Rx,也不是一般的良好编程习惯。
有许多下游操作符引入了异步性,因此如果您试图记录最终订阅者的观察结果,则不能保证它会在您的 Postprocess 被调用时发生。例如,Delay在订阅者之前插入一个简单的内容可能会让您失望,因为 Postprocess 将在订阅者看到事件之前被调用。您唯一的保证是在直接下游订阅者返回后调用 Postprocess OnNext- 就是这样。你不能说超过那个点的时间,所以它只是狭义上的后期处理。这就是为什么我会将“后处理”放在订阅者中,该订阅者执行您认为的“最终”操作或(如果该订阅者是同步的)紧接在它之前。
您对 的调用Finally不会做任何事情 - 它不会修改它所应用的 observable,它返回一个新的 observable,其中包含您正在丢弃的行为。
您正在捕获订阅者抛出的异常。这是特别微妙的 - 您不应该这样做,或者之后发送OnError给订阅者,因为订阅者现在处于未知错误状态。我在如何处理 RxJava 中观察者的 onNext 抛出的异常中详细讨论了这一点?(答案适用于 .NET)。您的问题并不清楚您是否应该在订阅者失败的情况下进行后处理,但由于您的实现没有尝试这样做,我也没有。
你的实现不通过传球OnCompleted和OnError事件从上游源(subj你的情况)。
尽管有上述注意事项,但这里有一种可能对您有用的方法。
您可以使用Do(或用于变异,Select将 preprocess 更改为 aFunc<T>如上所述)来处理事物的预处理方面,这使事情变得更容易。这是一个方便的自定义运算符来管理它:
public static class ObservableExtensions
{
public static IObservable<T> Process<T>(
this IObservable<T> source,
Action<T> preprocess,
Action<T> postprocess)
{
return Observable.Create<T>(o =>
source.Do(preprocess).Subscribe(x =>
{
o.OnNext(x);
try
{
postprocess(x);
}
catch (Exception e)
{
o.OnError(e);
}
},
o.OnError,
o.OnCompleted)
);
}
}
Run Code Online (Sandbox Code Playgroud)
Do 会将预处理错误作为 an 正确传播OnError,而 try-catch 将处理后处理错误。如上所述,我们故意不处理订阅者中的错误。该Create方法将正确执行 Rx 语法的其余部分。
像这样使用它:
subj.Process(Preprocess, PostProcess)
.Subscribe(/* observer or handlers etc. */);
Run Code Online (Sandbox Code Playgroud)
在这里,使用反应式测试框架(nuget rx-testing)和断言库 Shouldly(nuget shouldly)是对这个操作符的一些单元测试:
public class TestProcess : ReactiveTest
{
[Test]
public void ErrorFreeStreamProcessedCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Postprocess(2)"
};
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString())).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInPreprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Error"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add(x == 1 ? $"Preprocess({x})" : throw expectedException),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnError<int>(200, expectedException)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInPostprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Error"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add(x == 1 ? $"Postprocess({x})" : throw expectedException));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnError<int>(200, expectedException)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInSubscriberHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Subscribe(
x => { if (x != 1) throw expectedException; else actual.Add(x.ToString()); result.OnNext(x); },
result.OnError,
result.OnCompleted);
try
{
scheduler.Start();
}
catch
{
}
result.Messages.AssertEqual(
OnNext(100, 1)
);
actual.ShouldBe(expected);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
191 次 |
| 最近记录: |