NestJs @Sse - 事件仅由一个客户端消耗

Ron*_*nen 6 server-sent-events nestjs

我尝试了 Nest.js ( 28-SSE )提供的示例 SSE 应用程序,并修改了 sse 端点以发送计数器:

  @Sse('sse')
  sse(): Observable<MessageEvent> {
    return interval(5000).pipe(
      map((_) => ({ data: { hello: `world - ${this.c++}` }} as MessageEvent)),
    );
  }
Run Code Online (Sandbox Code Playgroud)

我预计正在侦听此 SSE 的每个客户端都会收到该消息,但是当打开多个浏览器选项卡时,我可以看到每条消息仅由一个浏览器使用,因此如果我打开三个浏览器,我会得到以下信息:

在此输入图像描述

我怎样才能得到预期的行为?

har*_*rry 6

为了实现您期望的行为,您需要为每个连接创建一个单独的流,并根据需要推送数据流。

下面是一种可能的简约解决方案

import { Controller, Get, MessageEvent, OnModuleDestroy, OnModuleInit, Res, Sse } from '@nestjs/common';
import { readFileSync } from 'fs';
import { join } from 'path';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';

@Controller()
export class AppController implements OnModuleInit, OnModuleDestroy {
  private stream: {
    id: string;
    subject: ReplaySubject<unknown>;
    observer: Observable<unknown>;
  }[] = [];
  private timer: NodeJS.Timeout;
  private id = 0;

  public onModuleInit(): void {
    this.timer = setInterval(() => {
      this.id += 1;
      this.stream.forEach(({ subject }) => subject.next(this.id));
    }, 1000);
  }

  public onModuleDestroy(): void {
    clearInterval(this.timer);
  }

  @Get()
  public index(): string {
    return readFileSync(join(__dirname, 'index.html'), 'utf-8').toString();
  }

  @Sse('sse')
  public sse(@Res() response: Response): Observable<MessageEvent> {
    const id = AppController.genStreamId();
    // Clean up the stream when the client disconnects
    response.on('close', () => this.removeStream(id));
    // Create a new stream
    const subject = new ReplaySubject();
    const observer = subject.asObservable();
    this.addStream(subject, observer, id);

    return observer.pipe(map((data) => ({
      id: `my-stream-id:${id}`,
      data: `Hello world ${data}`,
      event: 'my-event-name',
    }) as MessageEvent));
  }

  private addStream(subject: ReplaySubject<unknown>, observer: Observable<unknown>, id: string): void {
    this.stream.push({
      id,
      subject,
      observer,
    });
  }

  private removeStream(id: string): void {
    this.stream = this.stream.filter(stream => stream.id !== id);
  }

  private static genStreamId(): string {
    return Math.random().toString(36).substring(2, 15);
  }
}

Run Code Online (Sandbox Code Playgroud)

您可以为它创建一个单独的服务,使其更干净,并从不同的地方推送流数据,但作为示例展示,结果将如下面的屏幕截图所示

在此输入图像描述

  • SSE 在 NestJS 中工作的唯一要求是返回一个 Observable 流。从技术上讲,您可以返回包含静态数据的“new Observable(...)”,但在现实场景中,您不希望返回静态数据,它将是动态的。当您想将数据注入流中时,可以使用“Subject”或“ReplaySubject”,它们几乎是相同的。问题是,使用Subject/ReplaySubject,你可以将其作为可观察对象返回,并使用“next”方法将数据推送到流中,而在“new Observable()”中则不能。_(至少我是这么知道的)_ (2认同)