将AWS Lambda数据推送到Kinesis Stream

Har*_*ish 11 amazon-web-services amazon-kinesis aws-lambda

有没有办法将数据从Lambda函数推送到Kinesis流?我搜索过互联网,但没有找到任何与之相关的例子.

谢谢.

Max*_*era 11

是的,您可以将Lambda中的信息发送到Kinesis Stream,这很简单.确保您使用正确的权限运行Lambda.

  1. 创建一个名为kinesis.js的文件,该文件将提供一个"保存"功能,用于接收有效负载并将其发送到Kinesis Stream.我们希望能够在我们想要将数据发送到流的任何地方包含此"保存"功能.码:

const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
  apiVersion: kinesisConstant.API_VERSION, //optional
  //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
  //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
  region: kinesisConstant.REGION
});

const savePayload = (payload) => {
//We can only save strings into the streams
  if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
    try {
      payload = JSON.stringify(payload);
    } catch (e) {
      console.log(e);
    }
  }

  let params = {
    Data: payload,
    PartitionKey: kinesisConstant.PARTITION_KEY,
    StreamName: kinesisConstant.STREAM_NAME
  };

  kinesis.putRecord(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else     console.log('Record added:',data);
  });
};

exports.save = (payload) => {
  const params = {
    StreamName: kinesisConstant.STREAM_NAME,
  };

  kinesis.describeStream(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else {
      //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
      if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
        || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
        savePayload(payload);
      } else {
        console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
        console.log(`Record Lost`, JSON.parse(payload));
      }
    }
  });
};
Run Code Online (Sandbox Code Playgroud)

  1. 创建一个kinesisConstant.js文件以保持一致:)

module.exports = {
  STATE: {
    ACTIVE: 'ACTIVE',
    UPDATING: 'UPDATING',
    CREATING: 'CREATING',
    DELETING: 'DELETING'
  },
  STREAM_NAME: '<your-stream-name>',
  PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
  PAYLOAD_TYPE: 'String',
  REGION: '<the-region-where-you-have-lambda-and-kinesis>',
  API_VERSION: '2013-12-02'
}
Run Code Online (Sandbox Code Playgroud)

  1. 你的处理程序文件:我们添加了'done'函数来向想要将数据发送到流的人发送响应,但'kinesis.save(event)'完成所有工作.

const kinesis = require('./kinesis');

exports.handler = (event, context, callback) => {
  console.log('LOADING handler');
  
  const done = (err, res) => callback(null, {
    statusCode: err ? '400' : '200',
    body: err || res,
    headers: {
      'Content-Type': 'application/json',
    },
  });
  
  kinesis.save(event); // here we send it to the stream
  done(null, event);
}
Run Code Online (Sandbox Code Playgroud)


joh*_*hni 5

这应该像在计算机上一样完成。

这是一个例子nodejs

let aws = require('aws');
let kinesis = new aws.Kinesis();

// data that you'd like to send
let data_object = { "some": "properties" };
let data = JSON.stringify(data_object);

// push data to kinesis
const params = {
  Data: data,
  PartitionKey: "1",
  StreamName: "stream name"
}

kinesis.putRecord(params, (err, data) => {
  if (err) console.error(err);
  else console.log("data sent");
}
Run Code Online (Sandbox Code Playgroud)

请注意,这段代码将不起作用,因为它Lambda没有访问您的流的权限。通过访问AWS资源时Lambda,最好使用IAM角色;

  1. 配置新角色时Lambda,您可以选择现有/创建角色。
  2. 转到IAM,然后转到 角色,然后选择您分配给该函数的角色名称Lambda
  3. 添加相关权限(putRecord, putRecords)。

然后,测试Lambda.