将修改后的数组发布到Observable

Ale*_*mov 1 javascript reactive-programming rxjs typescript angular

任务

假设我们实现了Angular服务,并且需要向Observable<number[]>世界发布:

numbers: Observable<number[]>;
Run Code Online (Sandbox Code Playgroud)

我们希望订户:

  1. 订阅时获得最后的价值
  2. 每次我们更改并发布时,都会收到整个修改后的数组

从#1开始,内部Observable<number[]>应至少为a BehaviorSubject<number[]>

但是2号呢?假设我们需要实现一个方法publishNumbersChange(),该方法在我们需要更改和发布更改后的数组时被调用:

private publishNumbersChange() {
    // Get current numbers array
    ...        

    changeArray();

    // Now publish changed numbers array
    ...
}
Run Code Online (Sandbox Code Playgroud)

问题

RxJS 5模式是什么,用于实现根据其先前项目发布修改后的数组的任务?

因为我问这个问题主要是因为当前我正在做Angular的东西,所以这里是问题的第二部分:
Angular(以及基于RxJS的类似框架)在提供Observable哪种类型参数是数组时会使用什么代码?随后发布更新的数组?
他们只是单独保留当前发布的数组的副本吗?

一些想法

看来,分别存储基础数组是最简单的事情,因此我们始终可以访问它。但是同时,它看起来不像RxJS方式(需要在RxJS流外部具有状态)。

另一方面,我们可以执行以下操作:

private publishNumbersChange() {
    // To get the latest value from the stream, we have to subscribe
    const subscription: Subscription = this.numbers.subscribe((numbers: number[]) => {
        // We got the last value in stream in numbers argument. Now make changes to the array
        changeArray();

        // And push it back as a new value to the stream
        this.numbers.next(numbers);
    });

    // Also we have to unsubscribe
    subscription.unsubscribe();
}
Run Code Online (Sandbox Code Playgroud)

我在这里看到至少一个问题(不计算复杂性/可重用性):执行订阅回调和取消订阅之间的“竞争条件”。查看该代码,您无法确定该回调是否会真正执行。因此,这也不是执行此操作的正确方法。

bry*_*n60 6

听起来您可能正在寻找的操作员是扫描。

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])
Run Code Online (Sandbox Code Playgroud)

扫描会随着时间在一个可观察的流中累积值,并且流中的每个项目都将最后发出的值和当前值作为参数。在它们上执行一个函数,然后发出结果。上面的示例采用了一个新值,并将其附加到完整的数组中,第二个参数将其初始化为一个空数组。

显然,这是一种限制,因为它只做一件事情,可能不够鲁棒。在这种情况下,您需要变得聪明:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
Run Code Online (Sandbox Code Playgroud)

现在,您传递的是带有修饰符函数的“动作”,该修饰符定义了如何修改完整数组,以及修饰符可能需要与完整数组一起进入修饰符函数的任何其他数据的有效负载

所以你可以这样做:

let modifier = (full, item) => full.splice(full.indexOf(item), 1);
arraySubject.next({modifier, payload: itemToRemove});
Run Code Online (Sandbox Code Playgroud)

这将删除您通过发送的项目。您可以将该模式扩展为几乎所有数组修改。

但是,“扫描”的“陷阱”是订户仅从被订阅的时间中获得累积值。因此,这将发生:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
let subscriber1 = array$.subscribe();
//subscriber1 gets []
let modifier = (full, val) => full.concat([val]);
arraySubject.next({modifier, payload:1});
//subscriber1 gets [1]
arraySubject.next({modifier, payload:2});
//subscriber1 gets [1,2]
let subscriber2 = array$.subscribe();
//subscriber2 gets [2]
arraySubject.next({modifier, payload:3});
//subscriber1 gets [1,2,3]
//subscriber2 gets [2,3]
Run Code Online (Sandbox Code Playgroud)

看看那里发生了什么?行为主题中存储的唯一内容是第二个事件,而不是完整阵列,扫描存储了完整阵列,因此第二个订阅者仅获得第二个动作,因为它在第一个动作期间未订阅。因此,您需要一个持久的订户模式:

let arraySubject = BehaviorSubject([]);
let arrayModifierSubject = new Subject();
arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);
Run Code Online (Sandbox Code Playgroud)

然后通过在arrayModifierSubject上调用next进行修改:

let modifier = (full, val) => full.concat([val]);
arrayModifierSubject.next({modifier, payload: 1});
Run Code Online (Sandbox Code Playgroud)

并且您的订阅者从数组源获取数组:

subscriber1 = arraySubject.subscribe();
Run Code Online (Sandbox Code Playgroud)

在此设置中,所有数组修改都经过修饰符主题,修饰符主题又将其广播给行为主体,后者为整个订阅者存储完整的数组并将其广播给当前订阅者。行为主题(商店主题)被永久地订阅到修饰语主题(动作主题),并且是动作主题的唯一订阅者,因此整个数组永远不会丢失,因为始终会保留动作的整个历史记录。

一些示例用法(具有以上设置):

// insert 1 at end
let modifier = (full, value) => full.concat([value]);
arrayModifierSubject.next({modifier, payload: 1});

// insert 1 at start
let modifier = (full, value) => [value].concat(full);
arrayModifierSubject.next({modifier, payload: 1});

// remove 1
let modifier = (full, value) => full.splice(full.indexOf(value),1);
arrayModifierSubject.next({modifier, payload: 1});

// change all instances of 1 to 2
let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v);
arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});
Run Code Online (Sandbox Code Playgroud)

您可以将所有这些函数包装在“ publishNumbersChange”函数中。确切地实现此方法的方式取决于您的需求,您可以使功能类似:

insertNumber(numberToInsert:number) => {
   let modifier = (full, val) => full.concat([val]);
   publishNumbersChange(modifier, numberToInsert);
}

publishNumbersChange(modifier, payload) => {
   arrayModifierSubject.next({modifier, payload});
}
Run Code Online (Sandbox Code Playgroud)

或者您可以声明一个接口并创建类并使用该接口:

publishNumbersChange({modifier, payload}) => {
   arrayModifierSubject.next({modifier, payload});
}

interface NumberArrayModifier {
    modifier: (full: number[], payload:any) => number[];
    payload: any;
}

class InsertNumber implements NumberArrayModifier {
    modifier = (full: number[], payload: number): number[] => full.concat([payload]);
    payload: number;
    constructor(numberToInsert:number) {
        this.payload = numberToInsert;
    }
}

publishNumbersChange(new InsertNumber(1));
Run Code Online (Sandbox Code Playgroud)

您还可以将类似的功能扩展到任何阵列修改。最后一个提示:lodash是在此类系统中定义修饰符的巨大帮助

那么,这在角度服务环境中会如何看待?

这是一个非常简单的实现,无法高度重用,但是其他实现可能是:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new BehaviorSubject(INIT_STATE);
    private numberArrayModifierSource = new Subject();
    numberArray$ = this.numberArraySource.asObservable();

    constructor() {
        this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource);
    }

    private publishNumberChange(modifier, payload?) {
        this.numberArrayModifierSource.next({modifier, payload});
    }

    insertNumber(numberToInsert) {
        let modifier = (full, val) => full.concat([val]);
        this.publishNumberChange(modifier, numberToInsert);
    }

    removeNumber(numberToRemove) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.publishNumberChange(modifier, numberToRemove);
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.publishNumberChange(modifier);
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.publishNumberChange(modifier);
    }
}
Run Code Online (Sandbox Code Playgroud)

用法很简单,订阅者只需订阅numberArray $并通过调用函数来修改数组。您可以根据自己的喜好使用此简单模式来扩展功能。这控制对数字数组的访问,并确保始终以api定义的方式对其进行修改,并且您的状态和主题始终相同。

可以,但是如何使它通用/可重用?

export interface Modifier<T> {
    modifier: (state: T, payload:any) => T;
    payload?: any;
}

export class StoreSubject<T> {
    private storeSource: BehaviorSubject<T>;
    private modifierSource: Subject<Modifier<T>>;
    store$: Observable<T>;

    publish(modifier: Modifier<T>): void {
        this.modifierSource.next(modifier);
    }

    constructor(init_state:T) {
        this.storeSource = new BehaviorSubject<T>(init_state);
        this.modifierSource = new Subject<Modifier<T>>();
        this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource);
        this.store$ = this.storeSource.asObservable();
    }
}
Run Code Online (Sandbox Code Playgroud)

您的服务将变为:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new StoreSubject<number[]>(INIT_STATE);
    numberArray$ = this.numberArraySource.store$;

    constructor() {
    }

    insertNumber(numberToInsert: number) {
        let modifier = (full, val) => full.concat([val]);
        this.numberArraySource.publish({modifier, payload: numberToInsert});
    }

    removeNumber(numberToRemove: number) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.numberArraySource.publish({modifier, payload: numberToRemove});
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.numberArraySource.publish({modifier});
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.numberArraySource.publish({modifier});
    }
}
Run Code Online (Sandbox Code Playgroud)