使用RX组成命令总线

Phi*_*rdt 1 .net c# system.reactive

我熟悉RX并且作为我的实验项目,我正在尝试创建一个简单的命令总线,在概念上类似于:

class Bus
{
    Subject<Command> commands;
    IObservable<Invocation> invocations;

    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
    }

    public IObserver<Command> Commands
    {
        get { return this.commands; }
    }

    public IObservable<Invocation> Invocations
    {
        get { return this.invocations; }
    }
}

class Invocation
{
    public Command Command { get; set; }
    public bool Handled { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

这个想法是模块可以在启动时使用Invocations属性安装命令处理程序,并可以将他们希望的任何过滤应用于他们的订阅.另一方面,客户端可以通过调用来触发命令执行Commands.OnNext(command).

但是,我希望总线能够保证提交的每个命令都只由一个处理程序处理.也就是说,OnNext一旦第一个处理程序将Invocation.Handled设置为true,理想情况下应该终止处理,如果在结束时仍然为假OnNext(),Invocation.Handled则应该抛出异常.

我玩了创建自己的ISubject,IObservable和IObserver实现,但这感觉"脏又便宜";)

我正在努力探索RX提供的组合能力.在组合方式中,我如何提供"一次性"保证?

感谢您提供的任何见解.

Ana*_*tts 5

实际上,你通常在这里得到了正确的想法.你只需要做实际的调度.为此,SelectMany将提供帮助:

class Bus
{
    Subject<Command> commands;
    Subject<Invocation> invocations;

    // TODO: Instantiate me
    List<Func<Command, bool>> handlerList; 

    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = new Subject<Invocation>();

        commands.SelectMany(x => {
            // This FirstOrDefault() is just good ol' LINQ
            var passedHandler = 
                handlerList.FirstOrDefault(handler => handler(x) == true);

            return passedHandler != null ?
                Observable.Return(new Invocation() { Command = x, Handled = true}) :
                Observable.Throw<Invocation>(new Exception("Unhandled!"));
        }).Multicast(invocations).Connect();
    }

    /* ... snip ... */
}
Run Code Online (Sandbox Code Playgroud)

但是,说实话,这并没有真正展示Rx的强大功能,因为它正在同步执行处理程序列表.让这个完全无阻塞让我们更加引人注目.

首先,我们将改变我们的Func原型Func<Command, IObservable<Invocation>>.这意味着,一个接受命令并产生Future Invocation结果的方法(a-la Task<T>).然后,我们可以获得相同的行为,但我们的处理程序通过此选择器异步(通过TextArea编码):

commands.SelectMany(x =>
    handlerList.ToObservable()
        .Select(h => Observable.Defer(() => h(x)))
        .Concat()
        .SkipWhile(x => x.Handled == false)
        .TakeLast(1))
    .Multicast(invocations).Connect();
Run Code Online (Sandbox Code Playgroud)

这是一个非常适合研究生使用的Rx,但我们的想法是,对于每个Command,我们最初会创建一个处理程序流并按顺序运行它们(这就是Defer + Concat所做的),直到找到一个处理是真的,然后拿最后一个.

外部SelectMany将命令流选择为未来结果流(即,该类型IO<IO<Invocation>>然后将其展平,因此它成为结果流.

没有阻塞,非常简洁,100%可测试,类型安全的代码,只是表达了一个非常复杂的想法,这将是非常难以写的命令.这就是为什么Rx很酷.