Kar*_*mar 5 javascript node.js apache-kafka typescript microservices
我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和干净。
我正在使用Node 和 Kafka 使用 KafkaJS。
我对此很陌生,我无法在互联网上的任何地方找到在生产应用程序中使用它的完美/好方法。
问题(简而言之):每次生成新消息时,我们是否需要作为生产者进行连接?我们不能像Redis 或 NATS一样保持连接吗?
这是我到目前为止所尝试过的:
假设每次创建新用户时我都需要发送一条消息。
创建了kafka客户端,这样我们就不必每次都重新配置它
import { Kafka } from 'kafkajs';
class KafkaClient {
private _client: Kafka;
get client() {
if (!this._client) {
throw new Error('Cannot access client before initializing it');
} else {
return this._client;
}
}
connect(clientId: string, brokers: string[]) {
this._client = new Kafka({
clientId,
brokers,
});
}
}
export const producerClient = new KafkaClient();
Run Code Online (Sandbox Code Playgroud)
为所有类型的生产者创建了kafka生产者抽象类
import { Kafka, Message } from 'kafkajs';
import { Topics } from './topics';
interface Event {
topic: Topics;
data: Message;
}
export abstract class Publisher<T extends Event> {
private client: Kafka;
abstract topic: Topics;
constructor(client: Kafka) {
this.client = client;
}
async publish(data: T['data']): Promise<void> {
const producer = this.client.producer();
await producer.connect();
await producer.send({
topic: this.topic,
messages: [data],
});
}
}
Run Code Online (Sandbox Code Playgroud)
所有生产者都会有自己的这些类
import { Publisher } from './publisher';
import { Topics } from './topics';
interface Event {
topic: Topics.USER_CREATED;
data: {
value: string;
};
}
export class UserCreatedPublisher extends Publisher<Event> {
topic: Topics = Topics.USER_CREATED;
}
Run Code Online (Sandbox Code Playgroud)
在nodejs路由中使用它
import { Router } from 'express';
import { producerClient } from './kafka-client';
import { UserCreatedPublisher } from './user-created-publisher';
const router = Router();
router.post('/create-user', async (req, res) => {
const { email, password } = req.body;
// send this to the other service using kafka
await new UserCreatedPublisher(producerClient.client).publish({
value: JSON.stringify({ email, password }),
console.log('Message published');
res.status(201).send();
});
Run Code Online (Sandbox Code Playgroud)
问:每次生成新消息时,我们都需要作为生产者进行连接吗?我的回答:不
在这种情况下,我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。
例如为kafka生产者创建一个类
import { Kafka } from 'kafkajs';
export class KafkaProducer {
private static instance: KafkaProducer;
private _producer = null;
private _isConnected = false;
private constructor() {
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENTID,
brokers: [process.env.KAFKA_SERVER],
});
this._producer = kafka.producer();
}
public static getInstance(): KafkaProducer {
if (!KafkaProducer.instance) {
KafkaProducer.instance = new KafkaProducer();
}
return KafkaProducer.instance;
}
public get isConnected() {
return this._isConnected;
}
async connect(): Promise<void> {
try {
await this._producer.connect();
this._isConnected = true;
} catch (err) {
console.error(err);
}
}
get producer() {
return this._producer;
}
}
Run Code Online (Sandbox Code Playgroud)
并使用此生产者在代码的其他位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)
let kafka = KafkaProducer.getInstance();
if (!kafka.isConnected) {
await kafka.connect();
}
await kafka.producer.send({
topic: 'topicName',
messages: [
{
value: JSON.stringify(valueObj),
},
],
});
Run Code Online (Sandbox Code Playgroud)
并在应用程序关闭之前断开连接
try {
let kafka = KafkaProducer.getInstance();
if (kafka.isConnected) {
let producer = kafka.producer;
await producer.disconnect();
}
} catch (err) {
console.error(err);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2606 次 |
| 最近记录: |