Mat*_*ake 8 amazon-web-services amazon-kinesis amazon-kinesis-firehose
我已经阅读了很多关于向 firehose 添加换行符的类似问题,但它们都围绕着将换行符添加到源代码中。问题是我无权访问源,第三方正在将数据传输到我们的 Kinesis 实例,我无法将 '\n' 添加到源。
我尝试使用以下代码进行流水数据转换:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = [];
event.records.forEach((record) => {
const results = {
/* This transformation is the "identity" transformation, the data is left intact */
recordId: record.recordId,
result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
data: record.data + '\n'
};
output.push(results);
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
Run Code Online (Sandbox Code Playgroud)
但换行符仍然丢失。我也试过,JSON.stringify(record.data) + '\n'
但后来出现Invalid output structure
错误。
尝试解码 record.data 添加一个新行,然后将其再次编码为 base 64。
这是python,但想法是一样的
Run Code Online (Sandbox Code Playgroud)for record in event['records']: payload = base64.b64decode(record['data']) # Do custom processing on the payload here payload = payload + '\n' output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(json.dumps(payload)) } output.append(output_record) return {'records': output}
来自@Matt Westlake 的评论:
对于那些寻找节点答案的人来说,它是
const data = JSON.parse(new Buffer.from(record.data,'base64').toString('utf8'));
和
new Buffer.from(JSON.stringify(data) + '\n').toString('base64')
归档时间: |
|
查看次数: |
5112 次 |
最近记录: |