如何在Spark中处理增量S3文件

Him*_*ack 7 amazon-s3 bigdata apache-spark pyspark amazon-kinesis-firehose

我做了以下管道:任务管理器 - > SQS - >刮刀工人(我的应用程序) - > AWS Firehose - > S3文件 - > Spark - >(?)Redshift.

我试图解决/改进的一些事情,我很乐意获得指导:

  1. 刮刀可能会获得重复的数据,并将它们再次冲洗到firehose,这将导致重复的火花.我应该在开始计算之前使用Distinct函数在火花中解决这个问题吗?
  2. 我没有删除S3处理过的文件,因此数据变得越来越大.这是一个好习惯吗?(将s3作为输入数据库)或者我应该处理每个文件并在spark完成后删除它?目前我正在做sc.textFile("s3n://...../*/*/*")- 它将收集我的所有桶文件并运行计算.
  3. 要将结果放在Redshift(或s3)中 - >如何逐步执行此操作?也就是说,如果s3变得越来越大,红移将有重复的数据......我以前总是冲洗它吗?怎么样?

Shu*_*uan 0

我以前遇到过这些问题,尽管不是在单个管道中。这是我所做的。

  1. 删除重复项

    A。我使用BloomFilter来删除本地重复项。请注意,该文档相对不完整,但您可以轻松保存/加载/联合/相交布隆过滤器对象。您甚至可以reduce在过滤器上进行操作。

    b. 如果您直接将数据从 Spark 保存到 RedShift,您很可能需要花费一些时间和精力来更新当前批次的 BloomFilter、广播它,然后进行过滤以确保全局没有重复。之前我在 RDS 中使用了 UNIQUE 约束并忽略了该错误,但不幸的是RedShift 不遵守该约束

  2. 3. 数据变得越来越大

我使用 EMR 集群运行s3-dist-cp 命令来移动和合并数据(因为通常有很多小日志文件,这会影响 Spark 的性能)。如果您碰巧使用 EMR 来托管 Spark 集群,只需在分析之前添加一个步骤,将数据从一个存储桶移动到另一个存储桶。该步骤将command-runner.jar作为自定义 jar,命令如下所示

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess
Run Code Online (Sandbox Code Playgroud)

请注意,原始distcp不支持合并文件。

一般来说,您应该尽量避免将已处理和未处理的数据放在同一个存储桶(或至少是路径)中。