Kafkajs消费和生产批次问题

Вла*_*хин 9 apache-kafka kafkajs

我想在我的 Node.js 项目中使用 kafkajs。让我展示我的代码。

制作人:

const producer = kafka.producer();
await producer.connect();
const items = await getItems(); // getting somehow 5k items to produce 
await producer.send({
  topic: "items",
  messages: items.map(c => ({ value: JSON.stringify(c) })),
});
// even if I split here on chunks like this, in consumer I get batch with more than 100 items
/*
  const chunked = _.chunk(items, 100);
  for (var chunk of chunked) {
    await producer.send({
      topic: config.kafka.topics.tm.itemsToParse,
      messages: chunk.map(c => ({ value: JSON.stringify(c) })),
      headers: { from: "csv_parser" },
    });
  }
*/
Run Code Online (Sandbox Code Playgroud)

消费者:

const consumer = kafka.consumer({ groupId: "groupId" });
await consumer.connect();
await consumer.subscribe({ topic: "items" });

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
    for (chunk of _.chunk(batch.messages, 100)) {
      if (!isRunning()) break;
      // I can handle by 100 items
      let items = chunk.map(m => JSON.parse(m.value));
      /*
        handle items somehow, max ~30 sec
      */
      for (message of chunk) resolveOffset(message.offset); // took from example in kafkajs docs
      await heartbeat();
    }
  },
});
Run Code Online (Sandbox Code Playgroud)

一个实例可以处理 100 个项目(最多)。因此,现在如果一个消费者拿走这批 5k 件物品,其他人也不会拿走它(来自同一个 groupId,以并行处理它)。问题是:

  • 拆分批次以同时从许多消费者那里读取它是真的吗?
  • 我可以配置消费者批量消费指定数量的消息吗?
  • 生产商是否应该发送具有正确批量大小(按 100 件)的批次?=> 生产者必须适应消费者?
  • 如果我需要定义的批次大小,如何正确从生产者发送批次?