AWS Firehose 换行符

Mat*_*ake 8 amazon-web-services amazon-kinesis amazon-kinesis-firehose

我已经阅读了很多关于向 firehose 添加换行符的类似问题,但它们都围绕着将换行符添加到源代码中。问题是我无权访问源,第三方正在将数据传输到我们的 Kinesis 实例,我无法将 '\n' 添加到源。

我尝试使用以下代码进行流水数据转换:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
    /* Process the list of records and transform them */
    const output = [];
    event.records.forEach((record) => {
        const results = {
        /* This transformation is the "identity" transformation, the data is left intact */
            recordId: record.recordId,
            result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
            data: record.data + '\n'
        };
        output.push(results);
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};
Run Code Online (Sandbox Code Playgroud)

但换行符仍然丢失。我也试过,JSON.stringify(record.data) + '\n'但后来出现Invalid output structure错误。

Spi*_*iff 8

尝试解码 record.data 添加一个新行,然后将其再次编码为 base 64。

这是python,但想法是一样的

for record in event['records']:
    payload = base64.b64decode(record['data'])
    # Do custom processing on the payload here
    payload = payload + '\n'
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(json.dumps(payload))
    }
    output.append(output_record)
return {'records': output}
Run Code Online (Sandbox Code Playgroud)

来自@Matt Westlake 的评论:

对于那些寻找节点答案的人来说,它是

const data = JSON.parse(new Buffer.from(record.data,'base64').toString('utf8'));

new Buffer.from(JSON.stringify(data) + '\n').toString('base64')