NestJS:全局异常过滤器无法捕获基于 Kafka 的微服务应用程序抛出的任何内容

Mao*_*ion 1 exception nestjs kafkajs nestjs-config nestjs-exception-filters

我们使用 NestJS 作为基于微服务的架构的 Typescript 框架。我们的一些部署被称为“Kafka 工作线程”,这些 pod 运行的代码实际上并不公开任何 REST 端点,而只是监听 kafka 主题并处理传入事件。

问题是,配置为希望捕获任何抛出异常的全局异常过滤器没有捕获任何内容(我们最终点头UnhandledPromiseRejection

异常过滤器的基本配置如下(遵循 NestJS 文档指南):

@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
  private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);

  catch(error: Error, host: ArgumentsHost): void {
    this.logger.error('Uncaught exception', error);
  }
}
Run Code Online (Sandbox Code Playgroud)

我们针对此类工作人员的控制器配置如下:

@Controller()
export class KafkaWorkerController {
  private readonly logger = new AppLogger(KafkaWorkerController.name);

  constructor(
  ) {
    this.logger.log('Init');
  }

  @EventPattern(KafkaTopic.PiiRemoval)
  async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
    await asyncDoSomething();
    throw new Error('Business logic failed');
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中进行同步/异步操作的实际函数抛出的实际错误)。这不会发生。


同样,按照 NestJS 文档实现此类过滤器,我尝试了多种方法以及“注册”该过滤器的方法组合,但没有成功:

  • 在顶级模块定义中列为提供者{ provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
  • 使用@UseFilters(KafkaWorkerExceptionFilter)控制器类上方的装饰器
  • 在使用kafka 配置之前/之后在文件app.useGlobalFilters(new KafkaWorkerExceptionFilter());上使用 Nestmain.tsapp.connectMicroservice(...)

作为我们如何在“kafka-worker”配置中初始化应用程序的参考,以下是文件main.ts

async function bootstrap() {
  const app = await NestFactory.create(KafkaWorkerAppModule, {
    logger: ['error', 'warn', 'debug', 'log', 'verbose'],
  });

  app.use(Helmet());
  app.useGlobalPipes(
    new ValidationPipe({
      disableErrorMessages: false,
      whitelist: true,
      transform: true,
    }),
  );
  const logger: AppLogger = new AppLogger('Bootstrap');

  const config: ConfigService = app.get(ConfigService);

  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: SECRET_VALUE,
        brokers: [SECRET_HOST_ADDRESS],
        ssl: true,
        sasl: SOME_BOOLEAN_VALUE
          ? {
              mechanism: 'plain',
              username: SECRET_VALUE,
              password: SECRET_VALUE,
            }
          : undefined,
      },
      consumer: {
        allowAutoTopicCreation: false,
        groupId: SECRET_VALUE,
      },
    },
  });



  await app.startAllMicroservices();
  const port = config.servicePort || 3000;
  await app.listen(port, () => {
    logger.log(`Kafka Worker listening on port: ${port}`);
    logger.log(`Environment: ${config.nodeEnv}`);
  });
}

bootstrap();
Run Code Online (Sandbox Code Playgroud)

Zim*_*Gil 8

使用该connectMicroservice()方法时,您正在创建一个混合应用程序

默认情况下,混合应用程序不会继承为主(基于 HTTP)应用程序配置的全局管道、拦截器、防护程序和过滤器。要从主应用程序继承这些配置属性,请在 connectMicroservice() 调用的第二个参数(可选选项对象)中设置继承AppConfig属性,如下所示:

const microservice = app.connectMicroservice({
 transport: Transport.TCP
}, { inheritAppConfig: true });
Run Code Online (Sandbox Code Playgroud)

因此,针对您的情况,您需要做的就是:

  1. 添加过滤器 - 使用以下方法之一:
    1. 作为一个APP_FILTERKafkaWorkerAppModule
    2. 作为全局过滤器app.useGlobalFilters(new KafkaWorkerExceptionFilter())使用main.ts
    3. 在每个相关的提供者上使用@UseFilters(KafkaWorkerExceptionFilter)-我会避免在全局过滤器中使用这种情况
  2. inheritAppConfig选项添加到app.connectMicroservice()您的main.ts
app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: SECRET_VALUE,
        brokers: [SECRET_HOST_ADDRESS],
        ssl: true,
        sasl: SOME_BOOLEAN_VALUE
          ? {
              mechanism: 'plain',
              username: SECRET_VALUE,
              password: SECRET_VALUE,
            }
          : undefined,
      },
      consumer: {
        allowAutoTopicCreation: false,
        groupId: SECRET_VALUE,
      },
    },
  },
  { inheritAppConfig: true }
);
Run Code Online (Sandbox Code Playgroud)