如何将n个可观察对象动态组合到一个列表中?

Ron*_*erg 6 system.reactive

我有一组observables,可以为所谓的生成状态更改Channel.我有一个ChannelSet应该监控这些渠道.

我想写这样的东西:如果一个频道可操作,频道设置已启动,否则频道设置已关闭.

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
    channelSet.ChannelSetState = ChannelSetState.Up;
else
    channelSet.ChannelSetState = ChannelSetState.Down;
Run Code Online (Sandbox Code Playgroud)

但是我在哪里得到我的IEnumerable<ChannelState>?如果我有1个通道,我可以简单地订阅其状态更改并相应地修改通道集的状态.对于两个频道,我可以使用CombineLatest:

Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
    {
        if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
            return ChannelSetState.Up;
        else
            return ChannelSetState.Down;
    });
Run Code Online (Sandbox Code Playgroud)

但我有一个IEnumerable<Channel>和一个相应的IEnumerable<IObservable<ChannelState>>.我正在寻找类似的东西CombineLatest不限于固定数量的可观察量.

更复杂的是,可以添加和删除通道集合.因此,偶尔会添加一个频道.新渠道还会生成需要合并的状态更改.

所以我真正想要的是一个功能:

IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
Run Code Online (Sandbox Code Playgroud)

在输入发生变化时保持最新.应该有一些方法可以使用Rx实现这一点,但我无法弄清楚如何.

Eni*_*ity 3

有一种相当直接的方法可以用 Rx 做你想做的事情,但你需要只考虑可观察量,而不是混合可枚举量。

您真正需要考虑的函数签名是:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
Run Code Online (Sandbox Code Playgroud)

这是函数:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
Run Code Online (Sandbox Code Playgroud)

IObservable<ChannelState>为了使其发挥作用,每个人都必须表现得正确,这一点很重要IObservable<IObservable<ChannelState>>

我假设ChannelState枚举有一个Idle状态,并且每个状态在完成之前IObservable<ChannelState>都会生成零个或多个Operational/Idle值对(Operational后跟Idle)。

另外,您还说过“可以添加和删除通道集合”——从这方面考虑听起来IEnumerable<IObservable<ChannelState>>很合理——但在 Rx 中,您不必担心删除,因为每个可观察的都可以发出自己的完成信号。一旦它发出完成信号,就好像它已从集合中删除,因为它无法产生任何进一步的值。因此,您只需担心添加到集合中 - 这很容易使用主题。

现在可以像这样使用该函数:

var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);

channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });

channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc
Run Code Online (Sandbox Code Playgroud)

我使用一些测试代码运行此代码,该代码使用三个随机可ChannelState观察量,并Dof函数中调用进行调试,并得到以下序列:

1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down
Run Code Online (Sandbox Code Playgroud)

我想这就是你所追求的。如果我错过了什么,请告诉我。


根据下面的评论,ChannelState枚举有多种状态,但仅Operational意味着连接已建立。因此,添加一个DistinctUntilChanged运算符来隐藏多个“按下”状态非常容易。现在是代码:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
Run Code Online (Sandbox Code Playgroud)

添加了代码以确保第一个选择查询始终以1. 现在是代码:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .StartWith(1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
Run Code Online (Sandbox Code Playgroud)