标签: kafkajs

卡夫卡 这个主题分区没有领导者,因为我们正在进行领导者选举

刚刚开始学习Kafka。我正在尝试设置一个包含 2 个代理的小型 kafka 集群。当两个经纪人都启动时,我成功地将消息发送到我的主题。我想测试当 2 个代理之一完成时集群的行为。我使用 docker stop kafka1 停止了我的主代理(Kafka1),然后我尝试向我的集群发送一条消息,看看我的生产者是否能够理解他需要发送到 kafka2,因为 kafka1 已关闭。

但是我不断收到以下错误:

{“level”:“ERROR”,“timestamp”:“2022-07-19T18:59:46.891Z”,“logger”:“kafkajs”,“message”:“[连接]响应元数据(键:3,版本: 6)","broker":"localhost:39092","clientId":"my-app","error":"这个主题分区没有领导者,因为我们正在进行领导者选举" ,“correlationId”:1,“大小”:144}

下面是我的生产者代码:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()
Run Code Online (Sandbox Code Playgroud)

下面是我的 docker-compose-file:

version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - …
Run Code Online (Sandbox Code Playgroud)

javascript apache-kafka docker-compose kafkajs

10
推荐指数
1
解决办法
2万
查看次数

等待 KafkaJS 的领导选举

情况

我正在使用kafkajs写入一些动态生成的 kafka 主题。

我发现在注册我的制作人后立即写入这些主题通常会导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election

完整的错误是:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
Run Code Online (Sandbox Code Playgroud)

代码

这是导致问题的代码:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
Run Code Online (Sandbox Code Playgroud)

问题

两个问题:

  1. 连接到生产者(或发送)时,我应该做些什么特别的事情,以确保逻辑阻塞,直到生产者真正准备好将数据发送到 kafka 主题?
  2. 在向生产者发送数据时,我应该做些什么特别的事情以确保消息不会被丢弃?

node.js apache-kafka kafkajs

9
推荐指数
1
解决办法
1万
查看次数

Kafkajs消费和生产批次问题

我想在我的 Node.js 项目中使用 kafkajs。让我展示我的代码。

制作人:

const producer = kafka.producer();
await producer.connect();
const items = await getItems(); // getting somehow 5k items to produce 
await producer.send({
  topic: "items",
  messages: items.map(c => ({ value: JSON.stringify(c) })),
});
// even if I split here on chunks like this, in consumer I get batch with more than 100 items
/*
  const chunked = _.chunk(items, 100);
  for (var chunk of chunked) {
    await producer.send({
      topic: config.kafka.topics.tm.itemsToParse,
      messages: chunk.map(c => ({ value: JSON.stringify(c) })),
      headers: { …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafkajs

9
推荐指数
0
解决办法
2411
查看次数

Kafka消费者手动偏移提交

在我的一个用例中,包括使用数据、执行一些操作并将其生成到新主题。

我正在使用https://www.npmjs.com/package/kafkajs npm 库。

我想在成功操作后手动提交偏移量以避免任何数据丢失。我用来autoCommit: false避免数据在使用后自动提交。

这是手动提交偏移量的代码

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' }
])
Run Code Online (Sandbox Code Playgroud)

正如我在某处读到的,如果我们有意提交每个偏移量(在消费后立即提交偏移量),那么它将在代理上创建负载,并且这样做不好。

我需要 kafka 专家的建议来针对我的上述用例提出最佳方法以避免任何数据丢失?请指教

producer-consumer node.js apache-kafka kafkajs

6
推荐指数
1
解决办法
4751
查看次数

LibrdKafkaError:经纪人:随机运行约 2 小时后未知成员

现在,我想实施node-rdkafka到我们的服务中,但我多次遇到此错误Broker: Unknown member。github 上的同一问题是https://github.com/confluenceinc/confluence-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 ID 来重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluenceinc/confluence-kafka-python/issues/1004,但我重新检查了所有消费者组 ID,它是唯一的。

生产者的配置node-rdkafka如下:

      this.producer = new Producer({
        "client.id": this.cliendID,
        "metadata.broker.list": this.brokerList,
        'compression.codec': "lz4",
        'retry.backoff.ms': 200,
        'socket.keepalive.enable': true,
        'queue.buffering.max.messages': 100000,
        'queue.buffering.max.ms': 1000,
        'batch.num.messages': 1000000,
        "transaction.timeout.ms": 2000,
        "enable.idempotence": false,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        'dr_cb': true,
        "retries": 0,
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "sasl.mechanism": this.saslMechanism,
        "security.protocol": this.securityProtocol
      }, {
        "acks": -1
      })
Run Code Online (Sandbox Code Playgroud)

Consumer的配置node-rdkafka如下:

this.consumer = new KafkaConsumer({
        'group.id': this.groupID,
        'metadata.broker.list': …
Run Code Online (Sandbox Code Playgroud)

apache-kafka librdkafka kafkajs confluent-platform

5
推荐指数
0
解决办法
3607
查看次数

“组正在重新平衡,因此需要重新加入”错误导致消息被多次使用

我在 Kafkajs 消费者方面有优势,有时我会遇到重新平衡错误:

The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining
Run Code Online (Sandbox Code Playgroud)

然后,一旦消费者组重新平衡,就会再次处理最后处理的消息,因为由于错误而没有发生提交。

Kafka消费者初始化代码:

import { Consumer, Kafka } from 'kafkajs';


const kafkaInstance = new Kafka({
      clientId: 'some_client_id',
      brokers: ['brokers list'],
      ssl: true
    });

const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });

await kafkaConsumer.run({
      autoCommit: false, // cancel auto commit in …
Run Code Online (Sandbox Code Playgroud)

kafkajs

5
推荐指数
1
解决办法
1万
查看次数

无法使用 kafkajs 连接到种子代理

我正在尝试使用kafkajs以创建 kafka 消费者。但是,连接到 kafka 时我已经收到错误消息:

"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Closed connection"
Run Code Online (Sandbox Code Playgroud)

这是我正在使用的代码:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: [
      "abc123f.xyz.cde.net:9094",
      "abc123h.xyz.cde.net:9094",
      "abc123k.xyz.cde.net:9094"
      ]
})
Run Code Online (Sandbox Code Playgroud)

有谁知道为什么会发生此错误或如何解决它?

node.js apache-kafka kafkajs

1
推荐指数
1
解决办法
6075
查看次数

NestJS:全局异常过滤器无法捕获基于 Kafka 的微服务应用程序抛出的任何内容

我们使用 NestJS 作为基于微服务的架构的 Typescript 框架。我们的一些部署被称为“Kafka 工作线程”,这些 pod 运行的代码实际上并不公开任何 REST 端点,而只是监听 kafka 主题并处理传入事件。

问题是,配置为希望捕获任何抛出异常的全局异常过滤器没有捕获任何内容(我们最终点头UnhandledPromiseRejection

异常过滤器的基本配置如下(遵循 NestJS 文档指南):

@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
  private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);

  catch(error: Error, host: ArgumentsHost): void {
    this.logger.error('Uncaught exception', error);
  }
}
Run Code Online (Sandbox Code Playgroud)

我们针对此类工作人员的控制器配置如下:

@Controller()
export class KafkaWorkerController {
  private readonly logger = new AppLogger(KafkaWorkerController.name);

  constructor(
  ) {
    this.logger.log('Init');
  }

  @EventPattern(KafkaTopic.PiiRemoval)
  async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
    await asyncDoSomething();
    throw new Error('Business logic failed');
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中进行同步/异步操作的实际函数抛出的实际错误)。这不会发生。


同样,按照 NestJS 文档实现此类过滤器,我尝试了多种方法以及“注册”该过滤器的方法组合,但没有成功:

  • 在顶级模块定义中列为提供者{ …

exception nestjs kafkajs nestjs-config nestjs-exception-filters

1
推荐指数
1
解决办法
2779
查看次数