Mao*_*ion 1 exception nestjs kafkajs nestjs-config nestjs-exception-filters
我们使用 NestJS 作为基于微服务的架构的 Typescript 框架。我们的一些部署被称为“Kafka 工作线程”,这些 pod 运行的代码实际上并不公开任何 REST 端点,而只是监听 kafka 主题并处理传入事件。
问题是,配置为希望捕获任何抛出异常的全局异常过滤器没有捕获任何内容(我们最终点头UnhandledPromiseRejection
)
异常过滤器的基本配置如下(遵循 NestJS 文档指南):
@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);
catch(error: Error, host: ArgumentsHost): void {
this.logger.error('Uncaught exception', error);
}
}
Run Code Online (Sandbox Code Playgroud)
我们针对此类工作人员的控制器配置如下:
@Controller()
export class KafkaWorkerController {
private readonly logger = new AppLogger(KafkaWorkerController.name);
constructor(
) {
this.logger.log('Init');
}
@EventPattern(KafkaTopic.PiiRemoval)
async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
await asyncDoSomething();
throw new Error('Business logic failed');
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中进行同步/异步操作的实际函数抛出的实际错误)。这不会发生。
同样,按照 NestJS 文档实现此类过滤器,我尝试了多种方法以及“注册”该过滤器的方法组合,但没有成功:
{ provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
@UseFilters(KafkaWorkerExceptionFilter)
控制器类上方的装饰器app.useGlobalFilters(new KafkaWorkerExceptionFilter());
上使用 Nestmain.ts
app.connectMicroservice(...)
作为我们如何在“kafka-worker”配置中初始化应用程序的参考,以下是文件main.ts
:
async function bootstrap() {
const app = await NestFactory.create(KafkaWorkerAppModule, {
logger: ['error', 'warn', 'debug', 'log', 'verbose'],
});
app.use(Helmet());
app.useGlobalPipes(
new ValidationPipe({
disableErrorMessages: false,
whitelist: true,
transform: true,
}),
);
const logger: AppLogger = new AppLogger('Bootstrap');
const config: ConfigService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
});
await app.startAllMicroservices();
const port = config.servicePort || 3000;
await app.listen(port, () => {
logger.log(`Kafka Worker listening on port: ${port}`);
logger.log(`Environment: ${config.nodeEnv}`);
});
}
bootstrap();
Run Code Online (Sandbox Code Playgroud)
使用该connectMicroservice()
方法时,您正在创建一个混合应用程序。
默认情况下,混合应用程序不会继承为主(基于 HTTP)应用程序配置的全局管道、拦截器、防护程序和过滤器。要从主应用程序继承这些配置属性,请在 connectMicroservice() 调用的第二个参数(可选选项对象)中设置继承AppConfig属性,如下所示:
Run Code Online (Sandbox Code Playgroud)const microservice = app.connectMicroservice({ transport: Transport.TCP }, { inheritAppConfig: true });
因此,针对您的情况,您需要做的就是:
APP_FILTER
到KafkaWorkerAppModule
app.useGlobalFilters(new KafkaWorkerExceptionFilter())
使用main.ts
@UseFilters(KafkaWorkerExceptionFilter)
-我会避免在全局过滤器中使用这种情况inheritAppConfig
选项添加到app.connectMicroservice()
您的main.ts
:app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
},
{ inheritAppConfig: true }
);
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2779 次 |
最近记录: |