批处理PubSub请求

Eri*_*orn 2 google-cloud-pubsub

用于批处理pubsub请求的NODEJS示例代码如下所示:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer)
  .then(results => {
    const messageId = results[0];
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });
Run Code Online (Sandbox Code Playgroud)

对我来说,如何使用此示例同时发布多个消息并不清楚.有人可以解释如何调整此代码,以便它可以用于同时发布多个消息吗?

Kam*_*osn 9

如果您想批量发送消息,那么您需要保留发布者并publish多次调用它.例如,您可以将代码更改为以下内容:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();


const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify({ foo: 'bar1' });
const data2 = JSON.stringify({ foo: 'bar2' });
const data3 = JSON.stringify({ foo: 'bar3' });

const publisher = pubsub.topic(topicName).publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })

function handleResult(p) {
  p.then(results => {
    console.log(`Message ${results} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });
}

// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));
Run Code Online (Sandbox Code Playgroud)

邮件的批处理由maxMessagesmaxMilliseconds属性处理.前者表示要包含在批处理中的最大消息数.后者表示等待发布批处理的最大毫秒数.这些属性与发布延迟的较大批次(可能更高效)进行权衡.如果您要快速发布许多消息,那么该maxMilliseconds属性将不会产生太大影响; 只要有10条消息准备就绪,客户端库就会向Cloud Pub/Sub服务发出发布请求.但是,如果发布是零星的或缓慢的,那么在有十条消息之前可能会发送一批消息.

在上面的示例代码中,我们调用publish了三条消息.这不足以填满批次并发送.因此,在第一次调用10,000毫秒后publish,这三条消息将作为批处理发送到Cloud Pub/Sub.