在将 Rx 事件提供给订阅者之前和之后修改 C# 中的 Rx 事件

Den*_*voy 3 c# system.reactive

每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:

var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
    subj.Finally(obs.OnCompleted); 
    return subj.Subscribe(e =>
    {
        try
        {
            Preprocess(e);
            obs.OnNext(e);
            Postprocess(e);
        }
        catch (Exception ex) { obs.OnError(ex); }
    });
});
Run Code Online (Sandbox Code Playgroud)

我的问题:这是正确的方法,还是有更好的模板/扩展方法?

Jam*_*rld 5

分析

这里有一些微妙之处。

你的问题的标题是提到修改事件 - 但你真的不应该这样做。我假设该事件的处理前和处理后报告,但不要修改它。如果您需要这样做,最好将Do以下更改为Select并让 Preprocess 返回值的修改副本 - 这种不可变方法更加明确且不易出错。不清楚事件的后修改在可观察流中如何有意义 - 我会避免它并让订阅者报告观察。解决方案中没有任何内容在技术上阻止您改变处理方法中的事件 - 但是如果您正在改变并且有多个下游订阅者,则确实需要非常小心地处理行为。这种变异不是惯用的 Rx,也不是一般的良好编程习惯。

有许多下游操作符引入了异步性,因此如果您试图记录最终订阅者的观察结果,则不能保证它会在您的 Postprocess 被调用时发生。例如,Delay在订阅者之前插入一个简单的内容可能会让您失望,因为 Postprocess 将在订阅者看到事件之前被调用。您唯一的保证是在直接下游订阅者返回后调用 Postprocess OnNext- 就是这样。你不能说超过那个点的时间,所以它只是狭义上的后期处理。这就是为什么我会将“后处理”放在订阅者中,该订阅者执行您认为的“最终”操作或(如果该订阅者是同步的)紧接在它之前。

您对 的调用Finally不会做任何事情 - 它不会修改它所应用的 observable,它返回一个新的 observable,其中包含您正在丢弃的行为。

您正在捕获订阅者抛出的异常。这是特别微妙的 - 您不应该这样做,或者之后发送OnError给订阅者,因为订阅者现在处于未知错误状态。我在如何处理 RxJava 中观察者的 onNext 抛出的异常中详细讨论了这一点?(答案适用于 .NET)。您的问题并不清楚您是否应该在订阅者失败的情况下进行后处理,但由于您的实现没有尝试这样做,我也没有。

你的实现不通过传球OnCompletedOnError事件从上游源(subj你的情况)。

解决方案

尽管有上述注意事项,但这里有一种可能对您有用的方法。

您可以使用Do(或用于变异,Select将 preprocess 更改为 aFunc<T>如上所述)来处理事物的预处理方面,这使事情变得更容易。这是一个方便的自定义运算符来管理它:

public static class ObservableExtensions
{
    public static IObservable<T> Process<T>(
        this IObservable<T> source,
        Action<T> preprocess,
        Action<T> postprocess)
    {
        return Observable.Create<T>(o =>
            source.Do(preprocess).Subscribe(x =>
            {
                o.OnNext(x);
                try
                {
                    postprocess(x);
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
            },
            o.OnError,
            o.OnCompleted)
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

Do 会将预处理错误作为 an 正确传播OnError,而 try-catch 将处理后处理错误。如上所述,我们故意不处理订阅者中的错误。该Create方法将正确执行 Rx 语法的其余部分。

像这样使用它:

subj.Process(Preprocess, PostProcess)
    .Subscribe(/* observer or handlers etc. */);
Run Code Online (Sandbox Code Playgroud)

测试

在这里,使用反应式测试框架(nuget rx-testing)和断言库 Shouldly(nuget shouldly)是对这个操作符的一些单元测试:

public class TestProcess : ReactiveTest
{
    [Test]
    public void ErrorFreeStreamProcessedCorrectly()
    {
        var expected = new List<string>
        {
            "Preprocess(1)", "1", "Postprocess(1)",
            "Preprocess(2)", "2", "Postprocess(2)"
        };
        
        var actual = new List<string>();
        
        var scheduler = new TestScheduler();
        
        var xs = scheduler.CreateColdObservable<int>(
            OnNext(100, 1),
            OnNext(200, 2),
            OnCompleted<int>(300)
        );

        var sut = xs.Process(
            x => actual.Add($"Preprocess({x})"),
            x => actual.Add($"Postprocess({x})"));
            
        var result = scheduler.CreateObserver<int>();               
        
        sut.Do(x => actual.Add(x.ToString())).Subscribe(result);
        
        scheduler.Start();

        result.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnCompleted<int>(300)
        );
        
        actual.ShouldBe(expected);
    }

    [Test]
    public void ErrorInPreprocessHandledCorrectly()
    {
        var expected = new List<string>
        {
            "Preprocess(1)", "1", "Postprocess(1)",
            "Error"
        };

        var expectedException = new ApplicationException("Error");

        var actual = new List<string>();

        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable<int>(
            OnNext(100, 1),
            OnNext(200, 2),
            OnCompleted<int>(300)
        );

        var sut = xs.Process(
            x => actual.Add(x == 1 ? $"Preprocess({x})" : throw expectedException),
            x => actual.Add($"Postprocess({x})"));

        var result = scheduler.CreateObserver<int>();

        sut.Do(x => actual.Add(x.ToString()),
               e => actual.Add(e.Message)).Subscribe(result);

        scheduler.Start();

        result.Messages.AssertEqual(
            OnNext(100, 1),
            OnError<int>(200, expectedException)
        );

        actual.ShouldBe(expected);
    }

    [Test]
    public void ErrorInPostprocessHandledCorrectly()
    {
        var expected = new List<string>
        {
            "Preprocess(1)", "1", "Postprocess(1)",
            "Preprocess(2)", "2", "Error"
        };

        var expectedException = new ApplicationException("Error");

        var actual = new List<string>();

        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable<int>(
            OnNext(100, 1),
            OnNext(200, 2),
            OnCompleted<int>(300)
        );

        var sut = xs.Process(
            x => actual.Add($"Preprocess({x})"),
            x => actual.Add(x == 1 ? $"Postprocess({x})" : throw expectedException));

        var result = scheduler.CreateObserver<int>();

        sut.Do(x => actual.Add(x.ToString()),
               e => actual.Add(e.Message)).Subscribe(result);

        scheduler.Start();

        result.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnError<int>(200, expectedException)
        );

        actual.ShouldBe(expected);
    }

    [Test]
    public void ErrorInSubscriberHandledCorrectly()
    {
        var expected = new List<string>
        {
            "Preprocess(1)", "1", "Postprocess(1)",
            "Preprocess(2)"
        };

        var expectedException = new ApplicationException("Error");

        var actual = new List<string>();

        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable<int>(
            OnNext(100, 1),
            OnNext(200, 2),
            OnCompleted<int>(300)
        );

        var sut = xs.Process(
            x => actual.Add($"Preprocess({x})"),
            x => actual.Add($"Postprocess({x})"));

        var result = scheduler.CreateObserver<int>();

        sut.Subscribe(
            x => { if (x != 1) throw expectedException; else actual.Add(x.ToString()); result.OnNext(x); },
            result.OnError,
            result.OnCompleted);
        
        try
        {           
            scheduler.Start();
        }
        catch
        {
        
        }       

        result.Messages.AssertEqual(
            OnNext(100, 1)
        );

        actual.ShouldBe(expected);
    }
}
Run Code Online (Sandbox Code Playgroud)