如何对 Observable 使用 AsyncLocalStorage?

TmT*_*ron 8 node.js rxjs nestjs async-hooks

我想在NestJs 拦截器中使用AsyncLocalStorage

export interface CallHandler<T = any> {
    handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
    intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}
Run Code Online (Sandbox Code Playgroud)

拦截器函数获取 anext CallHandler返回一个Observable

在这种情况下我不能使用run(运行回调将在callHandler.handle()可观察对象完成之前立即退出):

  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const asyncLocalStorage = new AsyncLocalStorage();
    const myStore = {  some: 'data'};
    return asyncLocalStorage.run(myStore, () => callHandler.handle());
  }
Run Code Online (Sandbox Code Playgroud)

请参阅损坏的 replit-example

我想出的解决方案是这样的:

const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
    const myStore = { some: 'data' };

    localStorage.enterWith(myStore);
    return callHandler.handle().pipe(
      finalize(() => resource.emitDestroy())
    );
  }
}
Run Code Online (Sandbox Code Playgroud)

请参阅工作重绘示例

这似乎工作正常,但我不确定这是否真的正确 - 而且它看起来很混乱且容易出错。所以我想知道:

  1. 这完全正确吗?
  2. 有没有更好/更干净的方法来处理这个问题?

小智 0

这是cls-hooks的解决方案:

return new Observable(observer => {
  namespace.runAndReturn(async () => {
    namespace.set("some", "data")   
    next.handle()
        .subscribe(
          res => observer.next(res), 
          error => observer.error(error),
          () => observer.complete()
        )
  })
})
Run Code Online (Sandbox Code Playgroud)