Apache Spark将文件作为HDFS的流读取

Mak*_*sym 7 java hdfs apache-spark

如何使用Apache Spark Java从hdfs读取文件作为流?我不想读取整个文件,我想拥有文件流以便在满足某些条件时停止读取文件,我该如何使用Apache Spark?

小智 1

您可以使用 ssc 方法使用流式 HDFS 文件

val ssc = new StreamingContext(sparkConf, Seconds(batchTime))

val dStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( StreamDirectory, (x: Path) => true, newFilesOnly = false)

使用上面的 api param filter 函数来过滤要处理的路径。

如果您的条件不是文件路径/名称并且基于数据,那么如果条件满足,您需要停止流上下文。

为此,您需要使用线程实现,1)在一个线程中,您需要不断检查流上下文是否已停止,如果 ssc 停止,则通知其他线程等待并创建新的流上下文。

2)在第二个线程中,您需要检查条件,如果条件满足则停止流上下文。

如果您需要解释,请告诉我。