使用 spark 读取 * 带有自定义扩展名 * 的压缩文件

asa*_*ica 4 scala apache-spark

我想RDD[String]使用相当于sc.textFile("path/to/file.Z").

除了我的文件扩展名 if not gzbut isZ相反,因此该文件不会被识别为被 gzip 压缩。

我无法重命名它们,因为它会破坏生产代码。我不想复制它们,因为它们很大而且很多。我想我可以使用某种符号链接,但我想先看看是否有使用 Scala/spark 的方法(我现在在我的本地 Windows 机器上)。

我怎样才能有效地阅读这个文件?

Mik*_*nte 6

这里有一个解决这个问题的解决方法http://arjon.es/2015/10/02/reading-compressed-data-with-spark-using-unknown-file-extensions/

相关部分:

...扩展 GzipCodec 并覆盖 getDefaultExtension 方法。

package smx.ananke.spark.util.codecs

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.tmp" // You should change it to ".Z"

}
Run Code Online (Sandbox Code Playgroud)

现在我们刚刚注册了这个编解码器,在 SparkConf 上设置 spark.hadoop.io.compression.codecs:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")
Run Code Online (Sandbox Code Playgroud)