将 Observable 转换为异步生成器

err*_*ror 6 rxjs babeljs ecmascript-next reactive

我尝试将 rxjs 与 babeljs 结合使用来创建一个异步生成器函数,该函数在next被调用时产生,在error被调用时抛出,并在complete被调用时完成。我遇到的问题是我无法从回调中屈服。

我可以await承诺处理返回/抛出要求。

async function *getData( observable ) {
    await new Promise( ( resolve, reject ) => {
        observable.subscribe( {
            next( data ) {
                yield data; // can't yield here
            },
            error( err ) {
                reject( err );
            },
            complete() {
                resolve();
            }
        } );
    } );
}

( async function example() {
    for await( const data of getData( foo ) ) {
        console.log( 'data received' );
    }
    console.log( 'done' );
}() );
Run Code Online (Sandbox Code Playgroud)

这可能吗?

err*_*ror 6

我问了橡皮鸭,然后我写了下面的代码来完成我想要的事情:

function defer() {
    const properties = {},
        promise = new Promise( ( resolve, reject ) => {
            Object.assign( properties, { resolve, reject } );
        } );
        return Object.assign( promise, properties );
}

async function *getData( observable ) {
    let nextData = defer();
    const sub = observable.subscribe( {
        next( data ) {
            const n = nextData;
            nextData = defer();
            n.resolve( data );
        },
        error( err ) {
            nextData.reject( err );
        },
        complete() {
            const n = nextData;
            nextData = null;
            n.resolve();
        }
    } );
    try {
        for(;;) {
            const value = await nextData;
            if( !nextData ) break;
            yield value;
        }
    } finally {
        sub.unsubscribe();
    }
}
Run Code Online (Sandbox Code Playgroud)