我可以自动将换行添加到AWS Firehose记录吗?

MrH*_*Hen 16 amazon-kinesis amazon-kinesis-firehose

我正在尝试使用以下设置配置Kinesis Analytics应用程序:

  • 输入流是Kinesis Firehose,它采用字符串化的JSON值
  • SQL是一个简单的直通(以后需要更复杂但是为了测试,它只是通过发送数据)
  • 输出流是第二个Kinesis Firehose,它将记录传送到S3存储桶

接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,希望每个JSON记录都在自己的生产线上.Firehose输出只会附加所有打破JSONSERDE的JSON记录.

可以将AWS Lambda数据格式化程序附加到输出流,但这看起来很昂贵.我想要的是使用换行符分割每条记录.

如果我没有使用Google Analytics应用程序,我会将换行符附加到每个Firehose记录中.在应用程序的SQL中没有办法做到这一点似乎很奇怪:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";
Run Code Online (Sandbox Code Playgroud)

是添加Lambda数据格式化程序的最佳答案吗?我真的很想避免这种情况.

Lin*_*ina 5

我发布答案只是为了根据最近的 AWS 公告更新问题。AWS 最近宣布在 Kinesis Firehose Delivery 流上提供动态分区。它支持为每条记录添加新行字符。欲了解更多信息,请参阅


Sri*_* KN 4

我有类似的要求,需要向 firehose 生成的文件添加新行,在我们的应用程序中,firehose 是通过 API 网关调用的。

这是在集成请求部分下的正文映射模板中指定的。

API 网关中的以下命令会生成新行到 kinesis firehose 记录。

方法一:

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }
Run Code Online (Sandbox Code Playgroud)

如果您通过 API 网关调用 firehose,这将非常有效。

感谢和问候, Srivignesh KN

  • 我在问题中提到过这一点。我宁愿不为此添加 Lambda 变压器。 (3认同)