在反应式编程中循环之间的依赖关系

Ste*_*eve 7 c# f# reactive-programming system.reactive reactivex

在涉及反应式编程时,我经常会遇到两个流相互依赖的情况.解决这些案件的惯用方法是什么?

一个最小的例子:有按钮A和B,都显示一个值.单击A必须将A的值增加B.单击B必须将B的值设置为A.

我可以提出第一个解决方案(例如在F#中,但欢迎任何语言的答案):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1
Run Code Online (Sandbox Code Playgroud)

这个解决方案使用可变状态和主题,它不是非常易读,也不像惯用语.

我尝试的第二个解决方案涉及创建一个将两个相关流链接在一起的方法:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)
Run Code Online (Sandbox Code Playgroud)

这看起来好一点,但由于dependency在反应库中没有类似的方法,我相信存在更惯用的解决方案.使用第二种方法也很容易引入无限递归.

建议的方法是解决涉及流之间循环依赖的问题,例如上面的例子中的反应式编程?

Shl*_*omo 3

编辑

这是一个 F# 解决方案:

type DU = 
    | A 
    | B 

type State = { AValue : int; BValue : int }

let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 

    let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))

    let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
        | A -> { state with AValue = state.AValue + state.BValue }
        | B -> { state with BValue = state.AValue }
    )

    result
Run Code Online (Sandbox Code Playgroud)

得益于内置的可区分联合和记录,F# 实际上是一种很棒的语言。这是用 C# 编写的答案,带有自定义的受歧视联盟;我的 F# 已经生锈了。

诀窍是使用可区分联合将两个可观察量转变为一个可观察量。所以基本上将 a 和 b 联合成一个受歧视联合的可观察值:

a : *---*---*---**
b : -*-*--*---*---
du: ab-ba-b-a-b-aa
Run Code Online (Sandbox Code Playgroud)

完成后,您就可以对该项目是“A”推送还是“B”推送做出反应。

只是为了确认,我假设没有办法显式设置ButtonA/ButtonB 中嵌入的值。如果有的话,这些变化应该被建模为可观察的,并且也被纳入受歧视的联盟中。

var a = new Subject<Unit>();
var b = new Subject<Unit>();
var observable = a.DiscriminatedUnion(b)
    .Scan(new State(0, 1), (state, du) => du.Unify(
        /* A clicked case */_ => new State(state.A + state.B, state.B), 
        /* B clicked case */_ => new State(state.A, state.A)
    )
);

observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
Run Code Online (Sandbox Code Playgroud)

这是 C# 中依赖的类。其中大部分可以轻松转换为内置 F# 类型。

public class State /*easily replaced with an F# record */
{
    public State(int a, int b)
    {
        A = a;
        B = b;
    }

    public int A { get; }
    public int B { get; }
}

/* easily replaced with built-in discriminated unions and pattern matching */
public static class DiscriminatedUnionExtensions
{
    public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return Observable.Merge(
            a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
            b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
        );
    }

    public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
        Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return source.Select(union => Unify(union, f1, f2));
    }

    public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return union.Item == 1
            ? f1(union.Item1)
            : f2(union.Item2)
        ;
    }
}

public class DiscriminatedUnionClass<T1, T2>
{
    private readonly T1 _t1;
    private readonly T2 _t2;
    private readonly int _item;
    private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
    {
        _t1 = t1;
        _t2 = t2;
        _item = item;
    }

    public int Item
    {
        get { return _item; }
    }

    public T1 Item1
    {
        get { return _t1; }
    }

    public T2 Item2
    {
        get { return _t2; }
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
    {
        return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
    {
        return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 这种方法看起来更干净,谢谢!我注意到可以直接定义 `aObservable.Select(fun _ state -&gt; { state with AValue = state.AValue + state.BValue })...` ,这会将 Scan 语句简化为 `.Scan({AValue =0; BValue=1},有趣的状态 f -&gt; f 状态)`并消除对 DU 的需要。 (2认同)