Spark Streaming textFileStream不支持通配符

Ada*_*ter 11 hdfs apache-spark spark-streaming

我设置了一个简单的测试来从S3流式传输文本文件,并在我尝试类似的东西时使其工作

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")
Run Code Online (Sandbox Code Playgroud)

在桶中我会有日志文件进去,一切都会正常工作.

但如果他们是一个子文件夹,它将找不到任何放入子文件夹的文件(是的,我知道hdfs实际上并没有使用文件夹结构)

val input = ssc.textFileStream("s3n://mybucket/2015/04/")
Run Code Online (Sandbox Code Playgroud)

所以,我试着像我之前用标准的spark应用程序那样简单地做通配符

val input = ssc.textFileStream("s3n://mybucket/2015/04/*")
Run Code Online (Sandbox Code Playgroud)

但是当我尝试这个时它会抛出一个错误

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....
Run Code Online (Sandbox Code Playgroud)

我知道你在为标准的spark应用程序读取fileInput时可以使用通配符,但看起来在进行流式输入时,它不会这样做,也不会自动处理子文件夹中的文件.这里有什么我想念的吗?

最终我需要的是一个全天候运行的流媒体作业,它将监视一个按日期放置日志的S3存储桶

所以像

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>
Run Code Online (Sandbox Code Playgroud)

有没有办法把它交给最顶层的文件夹,它会自动读取显示在任何文件夹中的文件(显然每天都会增加日期)?

编辑

因此,在深入研究http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources上的文档时,它指出不支持嵌套目录.

谁能解释为什么会这样呢?

此外,由于我的文件将根据其日期嵌套,在流媒体应用程序中解决此问题的好方法是什么?这有点复杂,因为日志需要几分钟才能写入S3,所以当天写的最后一个文件可以写在前一天的文件夹中,即使我们进入新的一天几分钟.

Mic*_*iov 7

可以通过扩展FileInputDStream来创建一些"丑陋但工作正常的解决方案".写作sc.textFileStream(d)相当于

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)
Run Code Online (Sandbox Code Playgroud)

您可以创建将扩展FileInputDStream的CustomFileInputDStream.自定义类将从FileInputDStream类复制compute方法,并根据需要调整findNewFiles方法.

从以下位置更改find​​NewFiles方法:

 private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

  // Calculate ignore threshold
  val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
    currentTime - durationToRemember.milliseconds  // trailing end of the remember window
  )
  logDebug(s"Getting new files for time $currentTime, " +
    s"ignoring files older than $modTimeIgnoreThreshold")
  val filter = new PathFilter {
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
  }
  val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
  val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
  logInfo("Finding new files took " + timeTaken + " ms")
  logDebug("# cached file times = " + fileToModTime.size)
  if (timeTaken > slideDuration.milliseconds) {
    logWarning(
      "Time taken to find new files exceeds the batch size. " +
        "Consider increasing the batch size or reducing the number of " +
        "files in the monitored directory."
    )
  }
  newFiles
} catch {
  case e: Exception =>
    logWarning("Error finding new files", e)
    reset()
    Array.empty
}
Run Code Online (Sandbox Code Playgroud)

}

至:

  private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

      // Calculate ignore threshold
      val modTimeIgnoreThreshold = math.max(
        initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
        currentTime - durationToRemember.milliseconds  // trailing end of the remember window
      )
      logDebug(s"Getting new files for time $currentTime, " +
        s"ignoring files older than $modTimeIgnoreThreshold")
      val filter = new PathFilter {
        def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
      }
      val directories = fs.listStatus(directoryPath).filter(_.isDirectory)
      val newFiles = ArrayBuffer[FileStatus]()

      directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*))

      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
      logInfo("Finding new files took " + timeTaken + " ms")
      logDebug("# cached file times = " + fileToModTime.size)
      if (timeTaken > slideDuration.milliseconds) {
        logWarning(
          "Time taken to find new files exceeds the batch size. " +
            "Consider increasing the batch size or reducing the number of " +
            "files in the monitored directory."
        )
      }
      newFiles.map(_.getPath.toString).toArray
    } catch {
      case e: Exception =>
        logWarning("Error finding new files", e)
        reset()
        Array.empty
    }
  }
Run Code Online (Sandbox Code Playgroud)

将检查所有第一度子文件夹中的文件,您可以调整它以使用批处理时间戳以访问相关的"子目录".

我正如我所提到的那样创建了CustomFileInputDStream并通过调用它来激活它:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)
Run Code Online (Sandbox Code Playgroud)

它似乎表现出我们的预期.

当我写这样的解决方案时,我必须添加一些要考虑的点:

  • 您正在破坏Spark封装并创建一个您必须仅在时间过后支持的自定义类.

  • 我相信像这样的解决方案是最后的选择.如果您的用例可以通过不同的方式实现,通常最好避免这样的解决方案.

  • 如果你在S3上有很多"子目录"并且会检查每一个"子目录"它将花费你.

  • 理解Databricks是否因为可能的性能损失而不支持嵌套文件将是非常有趣的,也许有一个我没有想过的更深层次的原因.


Ert*_*maz 0

我们有同样的问题。我们用逗号连接子文件夹名称。

List<String> paths = new ArrayList<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");

try {        
    Date start = sdf.parse("2015/02/01");
    Date end = sdf.parse("2015/04/01");

    Calendar calendar = Calendar.getInstance();
    calendar.setTime(start);        

    while (calendar.getTime().before(end)) {
        paths.add("s3n://mybucket/" + sdf.format(calendar.getTime()));
        calendar.add(Calendar.DATE, 1);
    }                
} catch (ParseException e) {
    e.printStackTrace();
}

String joinedPaths = StringUtils.join(",", paths.toArray(new String[paths.size()]));
val input = ssc.textFileStream(joinedPaths);
Run Code Online (Sandbox Code Playgroud)

我希望通过这种方式您的问题能够得到解决。