使用 Redis 实现 NestJS 外部事件总线

Jor*_*rdi 4 cqrs nestjs

我正在尝试将 Nestjs 应用程序的 cqrs 设置与外部消息服务(例如 Redis)集成。我找到了一个拉取请求和一条评论,指出自 cqrs 7.0 版本以来,我应该能够将我的查询/事件/命令总线与外部服务集成。

我一直在尝试实现这一点,但我无法从 Nestjs 中找到有关该主题的太多信息。我唯一能找到的是一个过时的配置示例和 github 上的一个开放主题,用于创建有关如何实现此功能的教程。我设法通过关闭我在 github 上找到的有关此主题的有限帮助来替换默认的发布者和订阅者,但我真的不明白如何使用它来连接到外部服务,或者这是否是最佳方法这个问题。

事件总线

import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";

export class EventBus extends NestJsEventBus implements OnModuleInit {

constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
  super(commandBus, moduleRef);
}

onModuleInit() {

  const subscriber = new RedisEventSubscriber();
  subscriber.bridgeEventsTo(this._subject$);
  this.publisher = new RedisEventPublisher();

  }
}
Run Code Online (Sandbox Code Playgroud)

出版商

export class RedisEventPublisher implements IEventPublisher {

publish<T extends IEvent = IEvent>(event: T) {
  console.log("Event published to Redis")
  }
}
Run Code Online (Sandbox Code Playgroud)

订户

export class RedisEventSubscriber implements IMessageSource {

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    console.log('bridged event to thingy')
  }
}
Run Code Online (Sandbox Code Playgroud)

如果之前使用外部消息系统设置过 Nestjs 的任何人都可以分享他们的想法或分享如何正确执行此操作的资源,我们将不胜感激。

Jor*_*rdi 10

因此,几天后,我设法找到了两种连接外部事件总线的方法。我发现我实际上并不需要外部命令或查询总线,因为它们是通过 API 调用传入的。因此,如果您想使用 NestJS 连接到外部事件总线,这里有我发现的两个选项:

  1. 通过自定义发布者和订阅者
  2. 通过 NestJS 微服务包

这两种方法的主要区别在于它们连接到外部事件总线的方式以及它们如何处理传入消息。根据您的需求,一个可能比另一个更适合您,但我选择了第一个选择。

自定义发布者和订阅者

在我的应用程序中,我已经通过使用 NestJS 中的 EventBus 类并调用我的事件来手动发布对.publish()我的事件总线的调用。我创建了一个围绕本地 NestJS 事件总线以及自定义发布者和自定义订阅者的服务。

eventBusService.ts

export class EventBusService implements IEventBusService {
  
  constructor(
    private local: EventBus, // Injected from NestJS CQRS Module
    @Inject('eventPublisher') private publisher: IEventPublisher,
    @Inject('eventSubscriber') subscriber: IMessageSource) {
      subscriber.bridgeEventsTo(this.local.subject$);
   }
  
  publish(event: IEvent): void {
    this.publisher.publish(event);
  };
} 
Run Code Online (Sandbox Code Playgroud)

事件服务使用自定义订阅者将任何传入事件从远程事件总线重定向到本地事件总线.bridgeEventsTo()。自定义订阅者使用 redis NPM 包的客户端与 eventbus 进行通信。

订阅者.ts

export class RedisEventSubscriber implements IMessageSource {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    this.client.subscribe('Foo');
    this.client.on("message", (channel: string, message: string) => {

      const { payload, header } = JSON.parse(message);
      const event = Events[header.name];

      subject.next(new event(data.event.payload));
    });
  }
};
Run Code Online (Sandbox Code Playgroud)

该函数还包含将传入的 Redis 事件映射为事件的逻辑。为此,我创建了一个字典,将所有事件保存在 app.module 中,以便我可以查找该事件是否知道如何处理传入事件。然后它调用subject.next()一个新事件,以便将其放在内部 NestJS 事件总线上。

出版商.ts

为了根据我自己的事件更新其他系统,我创建了一个将数据发送到 Redis 的发布者。

export class RedisEventPublisher implements IEventPublisher {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  publish<T extends IEvent = IEvent>(event: T) {
    const name = event.constructor.name;
    const request = {
      header: {
        name
      },
      payload: {
        event
      }
    }
    this.client.publish('Foo', JSON.stringify(request));
  }
}
Run Code Online (Sandbox Code Playgroud)

就像订阅者一样,此类使用 NPM 包客户端将事件发送到 Redis eventBus。


微服务设置

某些部分的微服务设置与自定义事件服务方法非常相似。它使用相同的发布者类,但订阅设置的方式不同。它使用 NestJS 微服务包来设置一个微服务,用于侦听传入消息,然后调用 eventService 将传入事件发送到事件总线。

事件服务.ts

export class EventBusService implements IEventBusService {
  
  constructor(
    private eventBus: EventBus,
    @Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
   }
  
  public publish<T extends IEvent>(event: T): void {

    const data = {
      payload: event,
      eventName: event.constructor.name
    }
    
    this.eventPublisher.publish(data);
  };

  async handle(string: string) : Promise<void> {

    const data = JSON.parse(string);
    const event = Events[data.event.eventName];

    if (!event) {
      console.log(`Could not find corresponding event for 
      ${data.event.eventName}`);
    };

    await this.eventBus.publish(new event(data.event.payload));
  }
} 
Run Code Online (Sandbox Code Playgroud)

NestJS 有关于如何设置混合服务的文档,可以在此处找到。微服务包为您提供了一个@EventPattern()装饰器,您可以使用它来为传入的 eventbus 消息创建处理程序,您只需将它们添加到 NestJS 控制器并注入 eventService 即可。

控制器.ts

@Controller()
export default class EventController {

  constructor(@Inject('eventBusService') private eventBusService: 
  IEventBusService) {}

  @EventPattern(inviteServiceTopic)
  handleInviteServiceEvents(data: string) {
    this.eventBusService.handle(data)
  }
}
Run Code Online (Sandbox Code Playgroud)

由于我不想创建一个混合应用程序只是为了侦听传入事件,因此我决定采用第一个选项。代码很好地组合在一起,而不是使用带有@EventPattern()装饰器的随机控制器。

这花了相当长的时间才弄清楚,所以我希望它对将来的人有所帮助。:)