Sim*_*Sim 15 compression gzip dataframe apache-spark apache-spark-sql
Apache Spark DataFrameReader.json()可以自动处理gzip压缩的JSONlines文件,但似乎没有办法DataFrameWriter.json()编写压缩的JSONlines文件.额外的网络I/O在云中非常昂贵.
有没有解决这个问题的方法?
nsa*_*tos 24
使用Spark 2.X(可能更早,我没有测试)有一种更简单的方法来编写压缩JSON,它不需要更改配置:
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")
Run Code Online (Sandbox Code Playgroud)
这也适用于CSV和Parquet,在设置压缩选项后,只需使用.csv()和.parquet()而不是.json()来编写文件.
可能的编解码器是:none,bzip2,deflate,gzip,lz4和snappy.
gio*_*oca 13
以下解决方案使用pyspark,但我假设Scala中的代码类似.
第一个选项是在初始化SparkConf时设置以下内容:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
Run Code Online (Sandbox Code Playgroud)
使用上面的代码,您使用该sparkContext生成的任何文件都会使用gzip自动压缩.
第二个选项,如果要仅压缩上下文中的选定文件.让我们说"df"是您的目标数据框和文件名:
df_rdd = self.df.toJSON()
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
Run Code Online (Sandbox Code Playgroud)
在设置压缩选项SparkConf是不是一个很好的做法,作为公认的答案。它全局更改了行为,而不是逐个文件地指示设置。事实是,明确总是比隐含更好。在某些情况下,用户无法轻松操纵上下文配置,例如spark-shell或设计为另一个子模块的代码。
写作DataFrame与压缩,因为星火1.4的支持。实现该目标的几种方法:
df.write.json("filename.json", compression="gzip")
Run Code Online (Sandbox Code Playgroud)
而已!随便使用即可DataFrameWriter.json()。
魔术隐藏在代码中 pyspark/sql/readwriter.py
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: ...
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param dateFormat: ...
:param timestampFormat: ...
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
Run Code Online (Sandbox Code Playgroud)
支持的压缩格式为bzip2,gzip,lz4,snappy和deflate,不区分大小写。
scala API应该相同。
df.write.options(compression="gzip").json("filename.json")
Run Code Online (Sandbox Code Playgroud)
与上面类似。可以提供更多选项作为关键字参数。自Spark 1.4起可用。
df.write.option("compression", "gzip").json("filename.json")
Run Code Online (Sandbox Code Playgroud)
DataFrameWriter.option()从Spark 1.5开始添加。一次只能添加一个参数。
| 归档时间: |
|
| 查看次数: |
13734 次 |
| 最近记录: |