每次生产 Kafka 消息时都需要连接吗?

Kar*_*mar 5 javascript node.js apache-kafka typescript microservices

我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和干净。

我正在使用Node 和 Kafka 使用 KafkaJS

对此很陌生,我无法在互联网上的任何地方找到在生产应用程序中使用它的完美/好方法。

问题(简而言之):每次生成新消息时,我们是否需要作为生产者进行连接?我们不能像Redis 或 NATS一样保持连接吗?

这是我到目前为止所尝试过的:

使用案例

假设每次创建新用户时我都需要发送一条消息。

1.创建afka客户端

创建了kafka客户端,这样我们就不必每次都重新配置它

import { Kafka } from 'kafkajs';

class KafkaClient {
  private _client: Kafka;

  get client() {
    if (!this._client) {
      throw new Error('Cannot access client before initializing it');
    } else {
      return this._client;
    }
  }
  connect(clientId: string, brokers: string[]) {
    this._client = new Kafka({
      clientId,
      brokers,
    });
  }
}

export const producerClient = new KafkaClient();
Run Code Online (Sandbox Code Playgroud)

2.创建抽象发布者类

为所有类型的生产者创建了kafka生产者抽象类

import { Kafka, Message } from 'kafkajs';
import { Topics } from './topics';

interface Event {
  topic: Topics;
  data: Message;
}

export abstract class Publisher<T extends Event> {
  private client: Kafka;
  abstract topic: Topics;
  constructor(client: Kafka) {
    this.client = client;
  }

  async publish(data: T['data']): Promise<void> {
    const producer = this.client.producer();
    await producer.connect();
    await producer.send({
      topic: this.topic,
      messages: [data],
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

3.最后一个用户创建的生产者(继承自基类)

所有生产者都会有自己的这些类

import { Publisher } from './publisher';
import { Topics } from './topics';

interface Event {
  topic: Topics.USER_CREATED;
  data: {
    value: string;
  };
}

export class UserCreatedPublisher extends Publisher<Event> {
  topic: Topics = Topics.USER_CREATED;
}
Run Code Online (Sandbox Code Playgroud)

生成用户创建的事件/消息

在nodejs路由中使用它

import { Router } from 'express';
import { producerClient } from './kafka-client';
import { UserCreatedPublisher } from './user-created-publisher';

const router = Router();

router.post('/create-user', async (req, res) => {
  const { email, password } = req.body;

  // send this to the other service using kafka
    await new UserCreatedPublisher(producerClient.client).publish({
      value: JSON.stringify({ email, password }),
    console.log('Message published');

  res.status(201).send();
});
Run Code Online (Sandbox Code Playgroud)

mas*_*jon 4

问:每次生成新消息时,我们都需要作为生产者进行连接吗?我的回答:不

在这种情况下,我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。

例如为kafka生产者创建一个类

import { Kafka } from 'kafkajs';

export class KafkaProducer {
  private static instance: KafkaProducer;
  private _producer = null;
  private _isConnected = false;

  private constructor() {
    const kafka = new Kafka({
      clientId: process.env.KAFKA_CLIENTID,
      brokers: [process.env.KAFKA_SERVER],
    });
    this._producer = kafka.producer();
  }

  public static getInstance(): KafkaProducer {
    if (!KafkaProducer.instance) {
      KafkaProducer.instance = new KafkaProducer();
    }
    return KafkaProducer.instance;
  }

  public get isConnected() {
    return this._isConnected;
  }

  async connect(): Promise<void> {
    try {
      await this._producer.connect();
      this._isConnected = true;
    } catch (err) {
      console.error(err);
    }
  }

  get producer() {
    return this._producer;
  }
}
Run Code Online (Sandbox Code Playgroud)

并使用此生产者在代码的其他位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)

     let kafka = KafkaProducer.getInstance();
     if (!kafka.isConnected) {
       await kafka.connect();
     }
     await kafka.producer.send({
       topic: 'topicName',
       messages: [
         {
           value: JSON.stringify(valueObj),
         },
       ],
     });
Run Code Online (Sandbox Code Playgroud)

并在应用程序关闭之前断开连接

try {
      let kafka = KafkaProducer.getInstance();
      if (kafka.isConnected) {
        let producer = kafka.producer;
        await producer.disconnect();
      }
    } catch (err) {
      console.error(err);
    }
Run Code Online (Sandbox Code Playgroud)