是否可以在AWS Cloudformation中指定数据格式转换?

mic*_*cah 4 amazon-web-services aws-cloudformation amazon-kinesis-firehose

AWS文档使得它看起来好像您可以指定DataFormatConversionConfiguration一个AWS::KinesisFirehose::DeliveryStreaminformation,但不是关于该属性应该去哪里的文档.试了下加入它Properties,ExtendedS3DestinationConfiguration,ProcessingConfiguration,和的一个Processors.每次,CF都抱怨 -

以下资源无法更新:[EventFirehose].12:24:32 UTC-0500

UPDATE_FAILED AWS :: KinesisFirehose :: DeliveryStream EventFirehose遇到不受支持的属性DataFormatConversionConfiguration

有自己的文件说 -

如果您希望Kinesis Data Firehose将输入数据的格式从JSON转换为Parquet或ORC,请在ExtendedS3DestinationConfiguration或ExtendedS3DestinationUpdate中指定可选的DataFormatConversionConfiguration元素.

我究竟做错了什么?

Apo*_*eus 11

根据SDK 文档,它应该在内部ExtendedS3DestinationConfigurationExtendedS3DestinationUpdate.但是,根据此文档,cloudformation目前支持此属性.这是cloudformation与其他AWS服务之间非常常见的差异.这里提到了一个类似的问题AWS ECS:ECS服务Cloudformation模板中的严重错误(最近得到了解决).

目前,您可以通过SDK进行更新,或者等待一段时间以便了解cloudformation.

如果您希望Kinesis Data Firehose将输入数据的格式从JSON转换为Parquet或ORC,请在ExtendedS3DestinationConfiguration或ExtendedS3DestinationUpdate中指定可选的DataFormatConversionConfiguration元素


Iho*_*nko 7

这是我如何解决这个任务的。Firehose 流以 parquet 格式将数据写入 S3 存储桶:

LogsCollectionDatabase:
  Type: AWS::Glue::Database
  Properties:
    DatabaseInput:
      Description: Database for Kinesis Analytics
      Name: !Ref DatabaseName
    CatalogId: !Ref AWS::AccountId

LogsCollectionTable:
  Type: AWS::Glue::Table
  DependsOn: LogsCollectionDatabase
  Properties:
    DatabaseName: !Ref LogsCollectionDatabase
    CatalogId: !Ref AWS::AccountId
    TableInput:
      Name: serverlogs
      Description: Table for storing logs from kinesis
      TableType: EXTERNAL_TABLE
      StorageDescriptor:
        Columns:
          - Type: string
            Name: col1
          - Type: string
            Name: col2
        Location: !Sub s3://${DestinationBucketName}/${DestinationBucketPrefix}
        InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
        OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
        SerdeInfo:
          SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

KinesisFirehoseDeliveryStream:
  Type: AWS::KinesisFirehose::DeliveryStream
  DependsOn: FirehoseDeliveryIAMPolicy
  Properties:
    DeliveryStreamName: !Ref RegionalStreamName
    DeliveryStreamType: DirectPut
    ExtendedS3DestinationConfiguration:
      BucketARN: !Ref DestinationBucketArn
      Prefix: !Ref DestinationBucketPrefix
      BufferingHints:
        IntervalInSeconds: 60
        SizeInMBs: 64
      ErrorOutputPrefix: errors/
      RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
      DataFormatConversionConfiguration:
        Enabled: true
        InputFormatConfiguration:
          Deserializer:
            HiveJsonSerDe: {}
        OutputFormatConfiguration:
          Serializer:
            ParquetSerDe: {}
        SchemaConfiguration:
          CatalogId: !Ref AWS::AccountId
          RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
          DatabaseName: !Ref LogsCollectionDatabase
          TableName: !Ref LogsCollectionTable
          Region: !Ref AWS::Region
          VersionId: LATEST
Run Code Online (Sandbox Code Playgroud)

当然,需要为 FirehosStream 定义 IAM 角色和策略