刚刚开始学习Kafka。我正在尝试设置一个包含 2 个代理的小型 kafka 集群。当两个经纪人都启动时,我成功地将消息发送到我的主题。我想测试当 2 个代理之一完成时集群的行为。我使用 docker stop kafka1 停止了我的主代理(Kafka1),然后我尝试向我的集群发送一条消息,看看我的生产者是否能够理解他需要发送到 kafka2,因为 kafka1 已关闭。
但是我不断收到以下错误:
{“level”:“ERROR”,“timestamp”:“2022-07-19T18:59:46.891Z”,“logger”:“kafkajs”,“message”:“[连接]响应元数据(键:3,版本: 6)","broker":"localhost:39092","clientId":"my-app","error":"这个主题分区没有领导者,因为我们正在进行领导者选举" ,“correlationId”:1,“大小”:144}
下面是我的生产者代码:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
await producer.connect()
await producer.send({
topic: 'coverageEvolved',
messages: [
{ value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
],
})
await producer.disconnect()
Run Code Online (Sandbox Code Playgroud)
下面是我的 docker-compose-file:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
volumes:
- ./zookeeper/data:/var/lib/zookeeper/data
kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- …Run Code Online (Sandbox Code Playgroud) 我正在使用kafkajs写入一些动态生成的 kafka 主题。
我发现在注册我的制作人后立即写入这些主题通常会导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election。
完整的错误是:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
Run Code Online (Sandbox Code Playgroud)
这是导致问题的代码:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
Run Code Online (Sandbox Code Playgroud)
两个问题:
我想在我的 Node.js 项目中使用 kafkajs。让我展示我的代码。
制作人:
const producer = kafka.producer();
await producer.connect();
const items = await getItems(); // getting somehow 5k items to produce
await producer.send({
topic: "items",
messages: items.map(c => ({ value: JSON.stringify(c) })),
});
// even if I split here on chunks like this, in consumer I get batch with more than 100 items
/*
const chunked = _.chunk(items, 100);
for (var chunk of chunked) {
await producer.send({
topic: config.kafka.topics.tm.itemsToParse,
messages: chunk.map(c => ({ value: JSON.stringify(c) })),
headers: { …Run Code Online (Sandbox Code Playgroud) 在我的一个用例中,包括使用数据、执行一些操作并将其生成到新主题。
我正在使用https://www.npmjs.com/package/kafkajs npm 库。
我想在成功操作后手动提交偏移量以避免任何数据丢失。我用来autoCommit: false避免数据在使用后自动提交。
这是手动提交偏移量的代码
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' }
])
Run Code Online (Sandbox Code Playgroud)
正如我在某处读到的,如果我们有意提交每个偏移量(在消费后立即提交偏移量),那么它将在代理上创建负载,并且这样做不好。
我需要 kafka 专家的建议来针对我的上述用例提出最佳方法以避免任何数据丢失?请指教
现在,我想实施node-rdkafka到我们的服务中,但我多次遇到此错误Broker: Unknown member。github 上的同一问题是https://github.com/confluenceinc/confluence-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 ID 来重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluenceinc/confluence-kafka-python/issues/1004,但我重新检查了所有消费者组 ID,它是唯一的。
生产者的配置node-rdkafka如下:
this.producer = new Producer({
"client.id": this.cliendID,
"metadata.broker.list": this.brokerList,
'compression.codec': "lz4",
'retry.backoff.ms': 200,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
"transaction.timeout.ms": 2000,
"enable.idempotence": false,
"max.in.flight.requests.per.connection": 1,
"debug": this.debug,
'dr_cb': true,
"retries": 0,
"log_cb": (_: any) => console.log(`log_cb =>`, _),
"sasl.username": this.saslUsername,
"sasl.password": this.saslPassword,
"sasl.mechanism": this.saslMechanism,
"security.protocol": this.securityProtocol
}, {
"acks": -1
})
Run Code Online (Sandbox Code Playgroud)
Consumer的配置node-rdkafka如下:
this.consumer = new KafkaConsumer({
'group.id': this.groupID,
'metadata.broker.list': …Run Code Online (Sandbox Code Playgroud) 我在 Kafkajs 消费者方面有优势,有时我会遇到重新平衡错误:
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining
Run Code Online (Sandbox Code Playgroud)
然后,一旦消费者组重新平衡,就会再次处理最后处理的消息,因为由于错误而没有发生提交。
Kafka消费者初始化代码:
import { Consumer, Kafka } from 'kafkajs';
const kafkaInstance = new Kafka({
clientId: 'some_client_id',
brokers: ['brokers list'],
ssl: true
});
const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用kafkajs以创建 kafka 消费者。但是,连接到 kafka 时我已经收到错误消息:
"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Closed connection"
Run Code Online (Sandbox Code Playgroud)
这是我正在使用的代码:
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: [
"abc123f.xyz.cde.net:9094",
"abc123h.xyz.cde.net:9094",
"abc123k.xyz.cde.net:9094"
]
})
Run Code Online (Sandbox Code Playgroud)
有谁知道为什么会发生此错误或如何解决它?
我们使用 NestJS 作为基于微服务的架构的 Typescript 框架。我们的一些部署被称为“Kafka 工作线程”,这些 pod 运行的代码实际上并不公开任何 REST 端点,而只是监听 kafka 主题并处理传入事件。
问题是,配置为希望捕获任何抛出异常的全局异常过滤器没有捕获任何内容(我们最终点头UnhandledPromiseRejection)
异常过滤器的基本配置如下(遵循 NestJS 文档指南):
@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);
catch(error: Error, host: ArgumentsHost): void {
this.logger.error('Uncaught exception', error);
}
}
Run Code Online (Sandbox Code Playgroud)
我们针对此类工作人员的控制器配置如下:
@Controller()
export class KafkaWorkerController {
private readonly logger = new AppLogger(KafkaWorkerController.name);
constructor(
) {
this.logger.log('Init');
}
@EventPattern(KafkaTopic.PiiRemoval)
async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
await asyncDoSomething();
throw new Error('Business logic failed');
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中进行同步/异步操作的实际函数抛出的实际错误)。这不会发生。
同样,按照 NestJS 文档实现此类过滤器,我尝试了多种方法以及“注册”该过滤器的方法组合,但没有成功:
{ …exception nestjs kafkajs nestjs-config nestjs-exception-filters