通过本机 Promise 组合异步迭代

vit*_*y-t 6 asynchronous iterable promise rxjs typescript

我陷入困境,试图实现combine混合可迭代列表的逻辑,即我有一个Iterable+ Iterator+ AsyncIterable+列表,我试图将它们组合在一起,以获得与RXJS 的 mergeLatestWithAsyncIterator相同的输出。

链接到源代码,加上下面相同的内容(我的操作员文档):

(请参阅底部完整游乐场的链接)

function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
    return {
        [Symbol.asyncIterator](): AsyncIterator<T[]> {
            const list: AnyIterator<any>[] = [
                iterable[Symbol.asyncIterator](),
                ...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
                    (typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
            ];
            const pending = new Promise(() => {
                // forever-pending promise
            });
            let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
                changed = false, finishedCount = 0, lastError: { err: any } | null;
            return {
                next(): Promise<IteratorResult<any>> {
                    if (!start) {
                        start = Promise.all(list.map(a => a.next())).then(all => {
                            const value = [];
                            for (let i = 0; i < all.length; i++) {
                                const m = all[i];
                                if (m.done) {
                                    finished = true;
                                    return m;
                                }
                                value.push(m.value);
                            }
                            latest = [...value];
                            return {value, done: false};
                        });
                        return start;
                    }
                    if (!finished) {
                        const getValues = () => list.map((a, index) => {
                            if (!a) {
                                return pending;
                            }
                            const p = a.next() as any;
                            const it = typeof p.then === 'function' ? p : Promise.resolve(p);
                            return it.then((v: any) => {
                                if (v.done) {
                                    list[index] = null as any; // stop requesting values;
                                    if (++finishedCount === list.length) {
                                        return true; // the end;
                                    }
                                    return pending;
                                }
                                latest[index] = v.value;
                                changed = true;
                            }).catch((err: any) => {
                                lastError = lastError || {err};
                            });
                        });
                        return start
                            .then(() => {
                                if (lastError) {
                                    const r = Promise.reject(lastError.err);
                                    lastError = null;
                                    return r;
                                }
                                if (changed) {
                                    changed = false;
                                    return {value: [...latest], done: false};
                                }
                                return Promise.race(getValues()).then(end => {
                                    if (end) {
                                        finished = true;
                                        return {value: undefined, done: true};
                                    }
                                    changed = false;
                                    return {value: [...latest], done: false};
                                });
                            });
                    }
                    return Promise.resolve({value: undefined, done: true});
                }
            };
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

所以当我传递 3 个参数时:p1, p2(8), p3(7),定义如下......

function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
    return {
        [Symbol.asyncIterator](): AsyncIterator<T[]> {
            const list: AnyIterator<any>[] = [
                iterable[Symbol.asyncIterator](),
                ...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
                    (typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
            ];
            const pending = new Promise(() => {
                // forever-pending promise
            });
            let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
                changed = false, finishedCount = 0, lastError: { err: any } | null;
            return {
                next(): Promise<IteratorResult<any>> {
                    if (!start) {
                        start = Promise.all(list.map(a => a.next())).then(all => {
                            const value = [];
                            for (let i = 0; i < all.length; i++) {
                                const m = all[i];
                                if (m.done) {
                                    finished = true;
                                    return m;
                                }
                                value.push(m.value);
                            }
                            latest = [...value];
                            return {value, done: false};
                        });
                        return start;
                    }
                    if (!finished) {
                        const getValues = () => list.map((a, index) => {
                            if (!a) {
                                return pending;
                            }
                            const p = a.next() as any;
                            const it = typeof p.then === 'function' ? p : Promise.resolve(p);
                            return it.then((v: any) => {
                                if (v.done) {
                                    list[index] = null as any; // stop requesting values;
                                    if (++finishedCount === list.length) {
                                        return true; // the end;
                                    }
                                    return pending;
                                }
                                latest[index] = v.value;
                                changed = true;
                            }).catch((err: any) => {
                                lastError = lastError || {err};
                            });
                        });
                        return start
                            .then(() => {
                                if (lastError) {
                                    const r = Promise.reject(lastError.err);
                                    lastError = null;
                                    return r;
                                }
                                if (changed) {
                                    changed = false;
                                    return {value: [...latest], done: false};
                                }
                                return Promise.race(getValues()).then(end => {
                                    if (end) {
                                        finished = true;
                                        return {value: undefined, done: true};
                                    }
                                    changed = false;
                                    return {value: [...latest], done: false};
                                });
                            });
                    }
                    return Promise.resolve({value: undefined, done: true});
                }
            };
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

...我期待得到这样的东西:

[1, 2, 1] 
[2, 2, 1] 
[3, 2, 1] 
[4, 2, 1] 
[4, 2, 3] 
[4, 4, 3] 
[4, 4, 5] 
[4, 4, 7] 
[4, 6, 7] 
[4, 8, 7] 
Run Code Online (Sandbox Code Playgroud)

但相反,我得到以下信息:

[1, 2, 1] 
[2, 2, 1] 
[3, 2, 1] 
[4, 2, 1]
Run Code Online (Sandbox Code Playgroud)

我花了几个小时调试这个异步怪物,但无法弄清楚异步迭代的更新如何无法到达Promise.race随后的调用。

任何帮助深表感谢!

这是完整的游乐场

更新

为了证明代码中通常确实存在正确的值,这里是主控制台被注释掉的版本,而是添加在主函数中的其他两个位置。

Mik*_*ike 0

维塔利你提出了一个有趣的问题。:) 重用已在 Promise.race() 中启动的 Promise 非常棘手,但这是可能的。

这里不处理错误和拒绝,但如果一切正常,可以稍后添加该代码。

class CachedIterator<T>{
    protected lastValue: T | undefined; 
    protected lastValueFetched: boolean = false;

    public _done = false; 

    protected cachedIteration: Promise<() => IteratorResult<T>> | undefined;

    protected iterator: AsyncIterator<T>;
    constructor(iterable: AsyncIterable<T>, protected id?: string){
        const v = iterable as any;
        this.iterator = 
        (typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
        (typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v)) as AsyncIterator<T>

    }
     
    
    async next(): Promise<(() => IteratorResult<T>) | undefined>{
        if(this._done) return undefined;
        if(!this.cachedIteration){
            this.cachedIteration = this.iterator.next().then(
                (result)=> { 
                    return () => {
                        this.fetch(result);
                        return result;
                    }      
                }
            )
        } 
        return this.cachedIteration
    }
    async nextAndFetch() {
        const fetch = await this.next();
        if(fetch) fetch();
    }
    protected fetch(result: IteratorResult<T>){
        this.cachedIteration = undefined;

        this.lastValueFetched = true; 
        if(result.done){
            this._done = true;
            if (result.value !== undefined){
               this.lastValue = result.value; 
            }
            return;
        }
        this.lastValue = result.value;
        //console.log("AWAITED next Value:", iteration ) 
    }

    async last(): Promise<T> {
        if(!this.lastValueFetched){
            //console.log("no first value, request Next");
            await this.nextAndFetch();
        }
        return this.lastValue!;
    }
    done(){
        return this._done
    }
}



function combineAsync<T>(...values: AnyIterable<T>[]): AsyncIterable<any[]> {
    return {
        [Symbol.asyncIterator](): AsyncIterator<T[]> {
            let done = false;
            
            const list: CachedIterator<T>[] = 
                values.map((v: any, id) => new CachedIterator<T>(v, 'id' + id));
            //console.log("LIST", list);
            return {
                async next() {
                    let skipDoneIteration = true;
                    //FLAG for protection from ending iterations;
                    while(skipDoneIteration){

                        if( list.every( f => f.done() ) ){
                            return { done: true, value: undefined }
                        }
                        /* RACE is the main problem here and it's obligatory
                        we launch promises for race and one of them will be cached
                        and other will end someday, maybe before we run next RACE

                        so we need to separate getting async iteration results and 
                        fetching: drop iteration cache, converting iteratorResult 
                        to last() value  

                        so each iteration of combineAsync must have resulted with one fetch

                        */
                        skipDoneIteration = false;
                        const result = await Promise.race( 
                            list.filter(a => !a.done() ).map( a => a.next() )
                        ).then( fetch => {
                            if(fetch){
                                return fetch();
                            }
                            return undefined;
                        });
                        if(result){
                            skipDoneIteration = !!result.done;
                            /*
                             another problem is final iterations with response { done: true, value: undefined }
                             we must skip them; 
                            */
                        } 
                    }

                    return Promise.all( list.map(a => a.last())).then(values => {
                        return {value: values, done: false}
                    });
                    
                }
            }
        }
    }
}   
Run Code Online (Sandbox Code Playgroud)

游乐场链接

游乐场上一个链接

  • 更新的代码 - 已修复 (2认同)