我正在尝试使用以下设置配置Kinesis Analytics应用程序:
接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,希望每个JSON记录都在自己的生产线上.Firehose输出只会附加所有打破JSONSERDE的JSON记录.
我可以将AWS Lambda数据格式化程序附加到输出流,但这看起来很昂贵.我想要的是使用换行符分割每条记录.
如果我没有使用Google Analytics应用程序,我会将换行符附加到每个Firehose记录中.在应用程序的SQL中没有办法做到这一点似乎很奇怪:
CREATE OR REPLACE STREAM "STREAM_OUT" (
a VARCHAR(4),
b VARCHAR(4),
c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "STREAM_OUT"
SELECT STREAM
"a",
"b",
"c"
FROM "SOURCE_SQL_STREAM_001";
Run Code Online (Sandbox Code Playgroud)
是添加Lambda数据格式化程序的最佳答案吗?我真的很想避免这种情况.
我目前正在使用 Athena 和Kinesis Firehose
, Glue Crawler
。Kinesis Firehose
正在将 JSON 保存到单行文件,如下所示
{"name": "Jone Doe"}{"name": "Jane Doe"}{"name": "Jack Doe"}
Run Code Online (Sandbox Code Playgroud)
但是我注意到 athena 查询select count(*) from db.names
返回 1 而不是 3。在搜索问题之后。我找到了以下文件。
文章说 JSON 文件文件应该用新行存储。
{"name": "Jone Doe"}
{"name": "Jane Doe"}
{"name": "Jack Doe"}
Run Code Online (Sandbox Code Playgroud)
是否有一些聪明的技巧可以在单行 JSON 文件上运行 athena 查询?
感谢@Constantine,AWS Athena 正在执行分布式处理。由于单行 JSON 文件没有分隔符,因此无法执行分布式处理。因此,您必须在保存文件之前对其进行转换。
Kinesis Firehose 提供使用 Lambda 的转换,我添加了以下转换,以便从 AWS Athena 查询数据。
?const addNewLine = (data) => {
const parsedData = JSON.parse(new Buffer.from(data,'base64').toString('utf8'));
return new Buffer.from(JSON.stringify(parsedData) + '\n').toString('base64') …
Run Code Online (Sandbox Code Playgroud)