我试图在 NestJs 中发布到 kafka
async publish<T extends IEvent>(event: T) {
await this.client.connect();
await this.client.send('topic', event);
}
Run Code Online (Sandbox Code Playgroud)
但是还没有找到正确的方法,dispatchEvent 是受保护的。
编辑:使用 cqrs 所以这是在订阅 eventbus 的 eventpublisher 中。
我不相信你需要这个connect方法。你确定你订阅了消息响应吗?下面是一个带有客户端控制器和服务器控制器的工作示例:
import {
Controller,
Get,
Inject,
OnModuleDestroy,
OnModuleInit,
UseFilters,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ExceptionFilter } from './exception.filter';
@Controller()
export class KafkaClientController implements OnModuleInit, OnModuleDestroy {
constructor(@Inject('KAFKA_SERVICE') private readonly kafka: ClientKafka) {}
async onModuleInit() {
['hello', 'error', 'skip'].forEach((key) =>
this.kafka.subscribeToResponseOf(`say.${key}`),
);
}
onModuleDestroy() {
this.kafka.close();
}
@Get()
sayHello() {
return this.kafka.send('say.hello', { ip: '127.0.0.1' });
}
@Get('error')
@UseFilters(ExceptionFilter)
sayError() {
return this.kafka.send('say.error', { ip: '127.0.0.1' });
}
@Get('skip')
saySkip() {
return this.kafka.send('say.skip', { ip: '127.0.0.1' });
}
}
Run Code Online (Sandbox Code Playgroud)
import { BadRequestException, Controller, UseFilters } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { OgmaSkip } from '@ogma/nestjs-module';
import { AppService } from '../../app.service';
import { ExceptionFilter } from './exception.filter';
@Controller()
export class KafkaServerController {
constructor(private readonly service: AppService) {}
@MessagePattern('say.hello')
sayHello() {
return this.service.getHello();
}
@UseFilters(ExceptionFilter)
@MessagePattern('say.error')
sayError() {
throw new BadRequestException('Borked');
}
@OgmaSkip()
@MessagePattern('say.skip')
saySkip() {
return this.service.getHello();
}
}
Run Code Online (Sandbox Code Playgroud)
以上用于我正在制作的库的集成测试。您可以在此处查看完整的模块设置
| 归档时间: |
|
| 查看次数: |
2596 次 |
| 最近记录: |