S3目录上的Spark流

Bra*_*don 4 scala amazon-s3 amazon-web-services apache-spark spark-streaming

因此,我有成千上万的事件通过Amazon Kinesis流传输到SQS,然后转储到S3目录中。大约每10分钟创建一个新的文本文件,以将数据从Kinesis转储到S3。我想设置Spark Streaming,以便它将新文件流传输到S3中。现在我有

import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()
Run Code Online (Sandbox Code Playgroud)

但是,Spark Streaming不会拾取转储到S3中的新文件。我认为这与文件写入要求有关:

The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
Run Code Online (Sandbox Code Playgroud)

为什么Spark Streaming无法接收新文件?是因为AWS在目录中创建文件而不是移动文件吗?如何确保Spark能够拾取转储到S3中的文件?

Haf*_*did 6

为了流式传输一个S3存储桶。您需要提供S3存储桶的路径。并且它将流式传输该存储桶中所有文件中的所有数据。然后,无论何时在此存储桶中创建新文件,都会将其流式传输。如果您要将数据附加到以前读取的现有文件中,则不会读取这些新更新。

这是一小段有效的代码

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)

val ssc = new org.apache.spark.streaming.StreamingContext(
  sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
Run Code Online (Sandbox Code Playgroud)

希望会有所帮助。