vit*_*y-t 6 asynchronous iterable promise rxjs typescript
我陷入困境,试图实现combine混合可迭代列表的逻辑,即我有一个Iterable+ Iterator+ AsyncIterable+列表,我试图将它们组合在一起,以获得与RXJS 的 mergeLatestWithAsyncIterator相同的输出。
(请参阅底部完整游乐场的链接)
function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
return {
[Symbol.asyncIterator](): AsyncIterator<T[]> {
const list: AnyIterator<any>[] = [
iterable[Symbol.asyncIterator](),
...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
(typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
];
const pending = new Promise(() => {
// forever-pending promise
});
let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
changed = false, finishedCount = 0, lastError: { err: any } | null;
return {
next(): Promise<IteratorResult<any>> {
if (!start) {
start = Promise.all(list.map(a => a.next())).then(all => {
const value = [];
for (let i = 0; i < all.length; i++) {
const m = all[i];
if (m.done) {
finished = true;
return m;
}
value.push(m.value);
}
latest = [...value];
return {value, done: false};
});
return start;
}
if (!finished) {
const getValues = () => list.map((a, index) => {
if (!a) {
return pending;
}
const p = a.next() as any;
const it = typeof p.then === 'function' ? p : Promise.resolve(p);
return it.then((v: any) => {
if (v.done) {
list[index] = null as any; // stop requesting values;
if (++finishedCount === list.length) {
return true; // the end;
}
return pending;
}
latest[index] = v.value;
changed = true;
}).catch((err: any) => {
lastError = lastError || {err};
});
});
return start
.then(() => {
if (lastError) {
const r = Promise.reject(lastError.err);
lastError = null;
return r;
}
if (changed) {
changed = false;
return {value: [...latest], done: false};
}
return Promise.race(getValues()).then(end => {
if (end) {
finished = true;
return {value: undefined, done: true};
}
changed = false;
return {value: [...latest], done: false};
});
});
}
return Promise.resolve({value: undefined, done: true});
}
};
}
};
}
Run Code Online (Sandbox Code Playgroud)
所以当我传递 3 个参数时:p1, p2(8), p3(7),定义如下......
function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
return {
[Symbol.asyncIterator](): AsyncIterator<T[]> {
const list: AnyIterator<any>[] = [
iterable[Symbol.asyncIterator](),
...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
(typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
];
const pending = new Promise(() => {
// forever-pending promise
});
let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
changed = false, finishedCount = 0, lastError: { err: any } | null;
return {
next(): Promise<IteratorResult<any>> {
if (!start) {
start = Promise.all(list.map(a => a.next())).then(all => {
const value = [];
for (let i = 0; i < all.length; i++) {
const m = all[i];
if (m.done) {
finished = true;
return m;
}
value.push(m.value);
}
latest = [...value];
return {value, done: false};
});
return start;
}
if (!finished) {
const getValues = () => list.map((a, index) => {
if (!a) {
return pending;
}
const p = a.next() as any;
const it = typeof p.then === 'function' ? p : Promise.resolve(p);
return it.then((v: any) => {
if (v.done) {
list[index] = null as any; // stop requesting values;
if (++finishedCount === list.length) {
return true; // the end;
}
return pending;
}
latest[index] = v.value;
changed = true;
}).catch((err: any) => {
lastError = lastError || {err};
});
});
return start
.then(() => {
if (lastError) {
const r = Promise.reject(lastError.err);
lastError = null;
return r;
}
if (changed) {
changed = false;
return {value: [...latest], done: false};
}
return Promise.race(getValues()).then(end => {
if (end) {
finished = true;
return {value: undefined, done: true};
}
changed = false;
return {value: [...latest], done: false};
});
});
}
return Promise.resolve({value: undefined, done: true});
}
};
}
};
}
Run Code Online (Sandbox Code Playgroud)
...我期待得到这样的东西:
[1, 2, 1]
[2, 2, 1]
[3, 2, 1]
[4, 2, 1]
[4, 2, 3]
[4, 4, 3]
[4, 4, 5]
[4, 4, 7]
[4, 6, 7]
[4, 8, 7]
Run Code Online (Sandbox Code Playgroud)
但相反,我得到以下信息:
[1, 2, 1]
[2, 2, 1]
[3, 2, 1]
[4, 2, 1]
Run Code Online (Sandbox Code Playgroud)
我花了几个小时调试这个异步怪物,但无法弄清楚异步迭代的更新如何无法到达Promise.race随后的调用。
任何帮助深表感谢!
更新
为了证明代码中通常确实存在正确的值,这里是主控制台被注释掉的版本,而是添加在主函数中的其他两个位置。
维塔利你提出了一个有趣的问题。:) 重用已在 Promise.race() 中启动的 Promise 非常棘手,但这是可能的。
这里不处理错误和拒绝,但如果一切正常,可以稍后添加该代码。
class CachedIterator<T>{
protected lastValue: T | undefined;
protected lastValueFetched: boolean = false;
public _done = false;
protected cachedIteration: Promise<() => IteratorResult<T>> | undefined;
protected iterator: AsyncIterator<T>;
constructor(iterable: AsyncIterable<T>, protected id?: string){
const v = iterable as any;
this.iterator =
(typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
(typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v)) as AsyncIterator<T>
}
async next(): Promise<(() => IteratorResult<T>) | undefined>{
if(this._done) return undefined;
if(!this.cachedIteration){
this.cachedIteration = this.iterator.next().then(
(result)=> {
return () => {
this.fetch(result);
return result;
}
}
)
}
return this.cachedIteration
}
async nextAndFetch() {
const fetch = await this.next();
if(fetch) fetch();
}
protected fetch(result: IteratorResult<T>){
this.cachedIteration = undefined;
this.lastValueFetched = true;
if(result.done){
this._done = true;
if (result.value !== undefined){
this.lastValue = result.value;
}
return;
}
this.lastValue = result.value;
//console.log("AWAITED next Value:", iteration )
}
async last(): Promise<T> {
if(!this.lastValueFetched){
//console.log("no first value, request Next");
await this.nextAndFetch();
}
return this.lastValue!;
}
done(){
return this._done
}
}
function combineAsync<T>(...values: AnyIterable<T>[]): AsyncIterable<any[]> {
return {
[Symbol.asyncIterator](): AsyncIterator<T[]> {
let done = false;
const list: CachedIterator<T>[] =
values.map((v: any, id) => new CachedIterator<T>(v, 'id' + id));
//console.log("LIST", list);
return {
async next() {
let skipDoneIteration = true;
//FLAG for protection from ending iterations;
while(skipDoneIteration){
if( list.every( f => f.done() ) ){
return { done: true, value: undefined }
}
/* RACE is the main problem here and it's obligatory
we launch promises for race and one of them will be cached
and other will end someday, maybe before we run next RACE
so we need to separate getting async iteration results and
fetching: drop iteration cache, converting iteratorResult
to last() value
so each iteration of combineAsync must have resulted with one fetch
*/
skipDoneIteration = false;
const result = await Promise.race(
list.filter(a => !a.done() ).map( a => a.next() )
).then( fetch => {
if(fetch){
return fetch();
}
return undefined;
});
if(result){
skipDoneIteration = !!result.done;
/*
another problem is final iterations with response { done: true, value: undefined }
we must skip them;
*/
}
}
return Promise.all( list.map(a => a.last())).then(values => {
return {value: values, done: false}
});
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
629 次 |
| 最近记录: |