Ale*_*rev 7 bzip2 apache-flink
我有一个用bzip2压缩的维基百科转储(从http://dumps.wikimedia.org/enwiki/下载),但我不想解压缩它:我想在动态解压缩时处理它.
我知道可以用普通的Java做到这一点(参见例如Java - 读取BZ2文件并动态解压缩/解析),但我想知道它是如何在Apache Flink中做到的?我可能需要的是像https://github.com/whym/wikihadoop,但对于Flink,而不是Hadoop.
可以在Apache Flink中以下列格式读取压缩文件:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
Run Code Online (Sandbox Code Playgroud)
从包名称可以看出,Flink使用Hadoop的InputFormats来做到这一点.这是gz使用Flink的Scala API 读取文件的示例:(至少需要Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Run Code Online (Sandbox Code Playgroud)
Apache Flink只有内置的.deflate文件支持.添加对更多压缩编解码器的支持很容易,但还没有完成.
在Flink中使用HadoopInputFormats不会导致任何性能损失.Flink具有针对Hadoop Writable类型的内置序列化支持.
| 归档时间: |
|
| 查看次数: |
659 次 |
| 最近记录: |