bra*_*ana 23 json amazon-s3 amazon-web-services parquet amazon-kinesis-firehose
我想从kinesis firehose格式化为镶木地板的数据中摄取数据.到目前为止,我刚刚找到一个解决方案,意味着创建一个EMR,但我正在寻找更便宜和更快的东西,如直接从firehose存储收到的json作为镶木地板或使用Lambda函数.
非常感谢,Javi.
bra*_*ana 21
在处理AWS支持服务和一百种不同的实现之后,我想解释一下我所取得的成就.
最后,我创建了一个Lambda函数,用于处理Kinesis Firehose生成的每个文件,根据有效负载对事件进行分类,并将结果存储在S3中的Parquet文件中.
这样做并不容易:
首先,你应该创建一个Python虚拟环境,包括所有必需的库(在我的例子中是Pandas,NumPy,Fastparquet等).由于结果文件(包括所有库和我的Lambda函数很重,因此需要启动EC2实例,我使用了免费层中包含的那个).要创建虚拟环境,请执行以下步骤:
属性创建lambda_function:
import json
import boto3
import datetime as dt
import urllib
import zlib
import s3fs
from fastparquet import write
import pandas as pd
import numpy as np
import time
def _send_to_s3_parquet(df):
s3_fs = s3fs.S3FileSystem()
s3_fs_open = s3_fs.open
# FIXME add something else to the key or it will overwrite the file
key = 'mybeautifullfile.parquet.gzip'
# Include partitions! key1 and key2
write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
compression='GZIP',open_with=s3_fs_open)
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
try:
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
decoded = data.decode('utf-8')
lines = decoded.split('\n')
# Do anything you like with the dataframe (Here what I do is to classify them
# and write to different folders in S3 according to the values of
# the columns that I want
df = pd.DataFrame(lines)
_send_to_s3_parquet(df)
except Exception as e:
print('Error getting object {} from bucket {}.'.format(key, bucket))
raise e
Run Code Online (Sandbox Code Playgroud)将lambda函数复制到lambda.zip并部署lambda_function:
触发要在您喜欢时执行,例如,每次在S3中创建新文件,或者甚至可以将lambda函数与Firehose相关联.(我没有选择此选项,因为'lambda'限制低于Firehose限制,你可以配置Firehose每128Mb或15分钟写一个文件,但如果你将这个lambda函数关联到Firehose,lambda函数将被执行每3分钟或5MB,在我的情况下,我有生成很多小镶木地板文件的问题,因为每次启动lambda函数我生成至少10个文件).
Vla*_*iev 15
好消息,这个功能今天发布了!
Amazon Kinesis Data Firehose可以在将数据存储到Amazon S3之前将输入数据的格式从JSON转换为Apache Parquet或Apache ORC.Parquet和ORC是柱状数据格式,可节省空间并实现更快的查询
要启用,请转到Firehose流并单击Edit.你应该看到记录格式转换部分,如下面的截图:
有关详细信息,请参阅文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html
Amazon Kinesis Firehose接收流式传输记录,并可将其存储在Amazon S3(或Amazon Redshift或Amazon Elasticsearch Service)中.
每条记录最多可达1000KB.

但是,记录将附加到文本文件中,并根据时间或大小进行批处理.传统上,记录是JSON格式.
您将无法发送镶木地板文件,因为它不符合此文件格式.
可以触发Lambda数据转换功能,但这也不能输出镶木地板文件.
实际上,鉴于镶木地板文件的性质,您不可能一次构建一条记录.作为一种柱状存储格式,我怀疑它们确实需要在批处理中创建,而不是每个记录都附加数据.
底线:不.
| 归档时间: |
|
| 查看次数: |
10553 次 |
| 最近记录: |