使用AWS Lambda连接S3中的文件

V. *_*mma 4 concatenation amazon-s3 aws-sdk aws-lambda amazon-kinesis-firehose

有没有办法使用Lambda进行S3文件串联?

我有Firehose以最长的间隔(15分钟或128mb)将数据流式传输到S3,因此我每天有96个数据文件,但是我想将所有数据聚合到一个日常数据文件中,以便在以后读取数据时获得最快的性能。在Spark(EMR)中。

我创建了一个解决方案,当Firehose将新文件流式传输到S3时,将调用Lambda函数。然后,该函数从源存储桶中读取(s3.GetObject)新文件,并从目标存储桶中读取已连接的每日数据文件(如果以前的每日数据已经存在,则创建一个新文件),将两个响应体都解码为字符串,然后只需将它们加在一起,然后使用s3.PutObject(覆盖先前的聚合文件)写入目标存储桶即可。

问题在于,当聚合文件达到150+ MB时,Lambda函数在读取两个文件时会达到其〜1500mb的内存限制,然后失败。

目前,我的数据量很少,每天只有几百MB-s,但是将来这个数字将成倍增长。对我来说,Lambda的限制如此之低,以至于文件太小它们已经达到了。

或者,最好是由S3对象创建的事件或某种方式调度的作业(例如每天调度的)调用串联S3数据的替代方法?

l0b*_*0b0 5

我会重新考虑您是否真的要这样做:

  • S3的成本将上升。
  • 管道的复杂性将上升。
  • 从Firehose输入到Spark输入的延迟将增加。
  • 如果将单个文件注入Spark失败(这将在分布式系统中发生),则您必须重新整理一个巨大的文件,如果注入不是原子的,则可以对其进行切片,然后再次上传,所有这些数据可能需要很长时间才能处理大量数据。此时,您可能会发现恢复时间太长,因此必须推迟下一次注射。

相反,除非在这种情况下是不可能的,否则,如果将Firehose文件设置得尽可能,然后立即将它们发送到Spark :

  • 您几乎可以立即归档S3对象,从而降低了成本。
  • 数据尽快在Spark中可用。
  • 如果将单个文件注入Spark失败,那么将有更少的数据需要重新整理,并且如果您具有自动恢复功能,那么即使在某些系统始终保持全速运行的情况下,这种恢复甚至也不会引起注意(此时,批量注入会更加糟糕)。
  • 建立TCP连接和身份验证只会增加很小的延迟。

我不特别了解Spark,但总的来说,这样的“管道式”解决方案涉及:

  • Firehose输出存储桶上的定期触发器或(甚至更好的)事件侦听器,以尽快处理输入。
  • 注入器/变压器,可将数据从S3有效地移动到Spark。听起来Parquet可以帮上忙。
  • 一个实时的Spark / EMR /底层数据服务实例,准备接收数据。
  • 如果是基础数据服务,则可以使用某种方式创建新的Spark集群以按需查询数据。

当然,如果不可能以合理的金额准备好Spark数据(但不可查询(“可查询”?我不知道)),那么这可能不是一个选择。注入小块数据也可能非常耗时,但这对于可用于生产环境的系统似乎不太可能。


如果确实需要将数据分块到每日转储中,则可以使用分段上传。相比之下,我们每分钟从Firehose进行少量文件处理(每天很多GB),而没有明显的开销。


小智 5

您可以创建一个 Lambda 函数,该函数每天仅使用计划事件调用一次,并且在您的 Lambda 函数中,您应该使用上传部分 - 复制,无需在 Lambda 函数上下载文件。这个线程中已经有一个这样的例子