Mik*_*kia 9 compression zip apache-spark
我看了一下星火对的支持gzip的 -kind输入文件在这里,我不知道如果同样支持不同类型的压缩文件,如存在.ZIP文件.到目前为止,我已经尝试计算压缩在zip文件下的文件,但Spark似乎无法成功读取其内容.
我已经采取了外观的Hadoop的newAPIHadoopFile和newAPIHadoopRDD,但到目前为止,我还没有能够得到任何工作.
此外,Spark支持为指定文件夹下的每个文件创建分区,如下例所示:
SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
.setMaster("local[4]");
JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);
JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache();
Run Code Online (Sandbox Code Playgroud)
where C:\input\指向具有多个文件的目录.
在计算压缩文件的情况下,是否也可以将每个文件打包在一个压缩文件下,并按照每个文件一个分区的相同模式?
Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行.例如,您可以使用textFile("/ my/directory"),textFile("/ my/directory/.txt")和textFile("/ my/directory / .gz").
这可以通过提供有关Hadoop支持的压缩格式的信息来扩展,基本上可以通过查找所有扩展类来检查CompressionCodec(docs)
name | ext | codec class
-------------------------------------------------------------
bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec
gzip | .gz | org.apache.hadoop.io.compress.GzipCodec
lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec
Run Code Online (Sandbox Code Playgroud)
因此,只需调用以下内容即可实现上述格式和更多可能性:
sc.readFile(path)
Run Code Online (Sandbox Code Playgroud)
不幸的是,zip默认情况下不在受支持的列表中.
我找到了一篇很棒的文章:Hadoop:在Map/Reduce中处理ZIP文件,以及一些解释(示例),解释了如何ZipFileInputFormat与sc.newAPIHadoopFileAPI 一起导入.但这对我不起作用.
如果没有任何外部依赖关系,您可以加载文件,sc.binaryFiles然后解压缩PortableDataStream读取内容.这是我选择的方法.
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
// this solution works only for single file in the zip
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
} else {
sc.textFile(path, minPartitions)
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用此隐式类,您需要导入它并调用readFile
方法SparkContext:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Run Code Online (Sandbox Code Playgroud)
隐式类将zip正确加载您的文件并RDD[String]像以前一样返回.
注意:这仅适用于zip存档中的单个文件!
对于zip支持中的多个文件,请查看以下答案:https://stackoverflow.com/a/45958458/1549135
| 归档时间: |
|
| 查看次数: |
18758 次 |
| 最近记录: |