在交付给S3之前,可以在Kinesis Firehose中自定义分区吗?

mow*_*nay 5 amazon-s3 amazon-kinesis-firehose

我有一个Firehose流,旨在吸收来自不同来源和不同事件类型的数百万个事件。流应将所有数据作为原始\未更改数据的存储传递到一个S3存储桶。

我正在考虑根据事件消息中嵌入的元数据(例如事件源,事件类型和事件日期)在S3中对数据进行分区。

但是,Firehose根据记录到达时间遵循其默认分区。是否可以自定义此分区行为以满足我的需求?

小智 15

截至撰写本文时,Vlad 提到的动态分区功能仍然很新。我需要它成为 CloudFormation 模板的一部分,但该模板仍然没有正确记录。我必须添加DynamicPartitioningConfiguration才能使其正常工作。MetadataExtractionQuery语法也没有正确记录。

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

Run Code Online (Sandbox Code Playgroud)

  • 现在是 2022 年 4 月,我仍然没有找到有关 MetadataExtractionQuery 的文档。这个答案对于弄清楚语法确实很有帮助。谢谢! (5认同)

Vla*_*iev 13

自 2021 年 9 月 1 日起,AWS Kinesis Firehose 支持此功能。读公告博客文章

从文档中:

您可以使用 Key 和 Value 字段指定用作动态分区键的数据记录参数,并使用 jq 查询来生成动态分区键值。...

从 UI 来看是这样的:

在此输入图像描述 在此输入图像描述


Joh*_*ein 5

不可以。您不能根据事件内容“分区”。

一些选项是:

  • 发送到单独的 Firehose 流
  • 发送到 Kinesis Data Stream(而不是 Firehose)并编写您自己的自定义 Lambda 函数来处理和保存数据(请参阅:AWS 开发人员论坛:Athena 和 Kinesis Firehose
  • 使用 Kinesis Analytics 处理消息并将其“定向”到不同的 Firehose 流

如果您打算将输出与 Amazon Athena 或 Amazon EMR 一起使用,您还可以考虑将其转换为性能更好的Parquet 格式。这将需要将 S3 中的数据作为批处理进行后处理,而不是在数据到达流时对其进行转换。