有条件地结合可观察量

Che*_*ron 5 c# system.reactive

我有两个可观察者,一个IObservable<AlertData>和另一个IObservable<SoundRequestData>.AlertData包含一个属性Id,知道哪个SoundRequestData属于它.SoundRequestData只知道自己,并有一个Id可以与之相匹配的属性AlertData.

我想将这两种数据类型组合成一种新类型AlertDataViewModel.但是,我无法确定两个可观察数据中的数据顺序是否相同.我现在不关心输出中的顺序.

我要的是匹配AlertDataSoundRequestData.

我现在这样做的方式,虽然有效但很慢,但要等到其中一个观察者完成将所有数据提取到a中ObservableCollection.然后我启动另一个observable并匹配Id的.

有没有更好的方法呢?我想这可以表达为以下大理石图:

Imgur

所以a.id=1匹配3.id=1,b.id=2匹配4.id=2等等.

Tim*_*lds 2

首先我们来介绍一下IObserver<T>.

public static IObserver<T> Safe<T>(this IObserver<T> observer)
{
    var done = false;
    return Observer.Create<TResult>(
        value =>
        {
            if (!done)
            {
                observer.OnNext(value);
            }
        },
        error =>
        {
            if (!done)
            {
                done = true;
                observer.OnError(error);
            }
        },
        () =>
        {
            if (!done)
            {
                done = true;
                observer.OnCompleted();
            }
        });
}
Run Code Online (Sandbox Code Playgroud)

这只是确保在模式中调用观察者OnNext*(OnError|OnCompleted),并且忽略违反的行为。

我们现在可以通过按键缓冲两个序列中的值来实现您所描述的运算符,并且仅当两个序列之间有键匹配时才发出它们。

public static IObservable<TResult> Join<T1, T2, TKey, TResult>(
    IObservable<T1> source1,
    IObservable<T2> source2,
    Func<T1, TKey> key1,
    Func<T2, TKey> key2,
    Func<T1, T2, TResult> selector)
{
    return Observable.Create<TResult>(observer =>
    {
        var dict1 = new Dictionary<TKey, T1>();
        var dict2 = new Dictionary<TKey, T2>();
        var gate = new object();
        var safeObserver = observer.Safe();
        Action<TKey> emit = k =>
        {
            T1 value1;
            T2 value2;
            if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2))
            {
                var result = selector(value1, value2);
                safeObserver.OnValue(result);
                dict1.Remove(k);
                dict2.Remove(k);
            }
        };
        return new CompositeDisposable(
            source1.Synchronize(gate).Subscribe(
                value1 =>
                {
                    var k = key1(value1);
                    dict1[k] = value1;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted),
            source2.Synchronize(gate).Subscribe(
                value2 =>
                {
                    var k = key2(value2);
                    dict2[k] = value2;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted));
    });
}
Run Code Online (Sandbox Code Playgroud)

例子:

IObservable<AlertData> alertDatas = ...;
IObservable<SoundRequestData> = soundRequestDatas = ...;
IObservable<AlertDataViewModel> alertDataViewModels = Join(
    alertDatas,
    soundRequestDatas,
    alertData => alertData.Id,
    soundRequestData => soundRequestData.Id,
    (alertData, soundRequestData) => new AlertDataViewModel
    {
        AlertData = alertData,
        SoundRequestData = soundRequestData
    });
Run Code Online (Sandbox Code Playgroud)