Nestjs 基于事件的消息传递,具有 1 个生产者和 2 个消费者

J. *_*zer 5 microservices nestjs

使用 Nestjs 微服务,您可以使用基于请求/响应的方法发送消息并接收结果。@MessagePattern这是通过和的组合来实现的client.send('my_pattern', myData)。有关示例,请参阅嵌套文档:https://docs.nestjs.com/microservices/basics#request-responsehttps://docs.nestjs.com/microservices/basics#sending-messages

我如何接收基于事件的方法的结果?

假设您有一个用户微服务和一个身份验证微服务。每当创建用户时,您都希望创建一个身份验证主题(保存用户名和密码的哈希值,以便用户可以使用 api 请求登录到身份验证微服务而不是用户服务)。

auth/auth.controller.ts

  @EventPattern('EVT_USER_CREATED')
  public async handleUserCreated(data: any): Promise<AuthSubject> {
    if (!data.username || !data.password) {
      throw new RpcException('Auth subject must supply username and password');
    } 
    const newSubject: CreateAuthSubject = {
      username: data.username,
      password: data.password,
      email: data.email ?? '',
    };
    const sub = await this.subjectService.create(subject);
    return sub;
  }
Run Code Online (Sandbox Code Playgroud)

用户/用户.controller.ts

  @Post('')
  @ApiBody({ type: CreateUser })
  @ApiCreatedResponse({ type: User })
  public async create(@Body() user: CreateUser): Promise<User> {
    const newUser = await this.userService.create(user);

    this.userQueue
      .emit<any, CreateUser>('EVT_USER_CREATED', user)
      .subscribe((res) => {
        console.log(res);   // undefined
      });

    return newUser;
  }

Run Code Online (Sandbox Code Playgroud)

为了验证我的设置没有错误,我更改@EventPattern@MessagePattern和。它有效,即 res 是一个有效的身份验证主题,其用户名和密码符合预期。然而,使用这个问题中概述的基于事件的方法总是(无论身份验证控制器是否返回或抛出this.userQueue.emit<...this.userQueue.send<...res undefinedhandleUserCreated

最终我想实现以下目标:如果另一个微服务需要处理“EVT_USER_CREATED”事件,我只需@EventPattern('EVT_USER_CREATED') public async handleUserCreated向其控制器添加一个方法。然后,可观察对象this.userQueue.emit<any, CreateUser>('EVT_USER_CREATED', user)将收到两个结果:对于使用用户创建事件的每个微服务一次。

因此,假设我添加第三个微服务:负责保存支付信息、订单历史记录等的客户微服务。与身份验证服务一样,它订阅“EVT_USER_CREATED”。

客户/客户.controller.ts

  @EventPattern('EVT_USER_CREATED')
  public async handleUserCreated(data: any): Promise<AuthSubject> {    
    const customer = await this.customerService.create(data);
    return customer ;
  }
Run Code Online (Sandbox Code Playgroud)

现在,通过上述设置,微服务身份验证和客户将交替接收事件:如果用户微服务发出用户的创建,则只有身份验证服务会对其做出反应并从帽子用户创建身份验证主题。不会为该用户创建任何客户。对于在用户微服务中创建的下一个用户,将仅创建客户,但不会创建身份验证主题。第三个创建的用户将再次由身份验证微服务使用,但不会由客户微服务使用。等等。

                                           -- auth microservice 
                                         /
user microservice --- message broker ---
                                         \
                                           -- customer microservice
Run Code Online (Sandbox Code Playgroud)

总结一下:如何实现图中所示的消息传递架构,这样我只需要emit(...)user.controller.ts和 中进行一次调用,这样我就可以在该调用的订阅中收到两个响应emit(...)

小智 0

可能有点晚了,但我为未来的用户留下了我的贡献。

对于这种强基于事件的架构,建议使用像Kafka这样的消息代理,不同的服务可以订阅不同的主题,甚至不同的服务可以监听同一个主题,这样就可以做出反应以不同的方式处理同一事件。

Kafka 还提供了许多优势,当您想要扩展微服务时,这些优势非常有用,此外,它还得到 Nest 的支持。 https://docs.nestjs.com/microservices/kafka

您应该记住的是,当使用消息传递系统时,通信通常是完全异步的,这是一个重要的细节,因为根据您提出的案例,您不仅可以通过 kafka 向另一个微服务发送消息,验证用户的凭据,但需要向客户端返回响应。对于这种情况,nest 提供的工具是有限的,因为使用 @MessagePattern 装饰器,我们可以在控制器中接收来自 kafka 的消息,但我们不能等待响应(同一主题或另一个主题)并确认成功响应应客户要求。在这种情况下,您可以方便地使用 Kafkajs ( https://kafka.js.org/ ) 创建自己的传输器或自己的 kafka 客户端,以便在必要时可以保留用户的请求,直到收到确认另一个话题。

我在某些论坛中看到的一个解决方案是在内存中保存一个对象(键/值),其中包含与用户的ID关联的请求的“响应”对象,这样您就可以通过kafka或任何其他方式发送消息其他代理执行特定操作,并接收另一个控制器的确认,检索用户的“Request”对象,然后发送响应(Request.send (...))。这不是理想的解决方案,但它适用于某些情况。

例如:

@Controller('users')
class MyController {
    constructor(private kafkaClient: KafkaClient) {}
    private users: new Map<string, Request>() // Or private users: any = {}

    onModuleInit() {
        // By default, NestJs create topic "users-topic.reply"
        this.kafkaClient.subscribeToResponseOf('users-topic')
    }

    @Post('auth')
    authUser(@Req req: Request, @Res res: Response): void {
        const userData = req.body;
        const { id } = userData;

        // Save request object in private variable;
        this.users.set(id, res); // or this.users[id] = res;

        // Send kafka message to validate user data
        this.kafkaClient.emit('users-topic')
    }

    @MessagePattern('users-topic.reply')
    authUser(@Payload() data: any): any {
        if (data.message === 'success') {
            const res:Request = this.users.get(data.id); // or this.users[data.id];
            
            // Clean users object
            this.users.remove(data.id); // or this.users[data.id] = null;

            // Response client request
            res.send('Authentication successfully')
        }
    }
}
Run Code Online (Sandbox Code Playgroud)