sai*_*aif 5 amazon-s3 amazon-kinesis amazon-kinesis-firehose aws-cdk
我正在使用 kinesis 传输流发送流,从事件桥到 s3 存储桶。但我似乎找不到哪个类可以选择配置动态分区?
这是我的交付流代码:
new CfnDeliveryStream(this, `Export-delivery-stream`, {
s3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: kinesisFirehoseRole.roleArn,
prefix: `test/!{timestamp:yyyy/MM/dd}/`
}
});
Run Code Online (Sandbox Code Playgroud)
我已经在同一个问题上工作了几天,终于找到了工作。以下是如何在 CDK 中实现它的示例。简而言之,分区必须像您所做的那样启用,但您需要在所谓的processingConfiguration中设置密钥和.jq表达式。
我们传入的 json 数据看起来像这样:
{
"data":
{
"timestamp":1633521266990,
"defaultTopic":"Topic",
"data":
{
"OUT1":"Inactive",
"Current_mA":3.92
}
}
}
Run Code Online (Sandbox Code Playgroud)
CDK 代码如下所示:
const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
deliveryStreamName: 'deliverystream',
extendedS3DestinationConfiguration: {
cloudWatchLoggingOptions: {
enabled: true,
},
bucketArn: Bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingHints: {
intervalInSeconds: 60,
},
dynamicPartitioningConfiguration: {
enabled: true,
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'MetadataExtraction',
parameters: [
{
parameterName: 'MetadataExtractionQuery',
parameterValue: '{defaultTopic: .data.defaultTopic}',
},
{
parameterName: 'JsonParsingEngine',
parameterValue: 'JQ-1.6',
},
],
},
{
type: 'AppendDelimiterToRecord',
parameters: [
{
parameterName: 'Delimiter',
parameterValue: '\\n',
},
],
},
],
},
},
})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2857 次 |
| 最近记录: |