在Spark 2.0中加载压缩的gzip压缩文件

fem*_*yte 7 apache-spark pyspark

如何在Spark 2.0上的Pyspark中加载gzip压缩的csv文件?

我知道可以按如下方式加载未压缩的csv文件:

spark.read.format("csv").option("header",          
                                "true").load("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

要么

spark.read.option("header", "true").csv("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

fem*_*yte 15

我刚刚发现以下内容适用于gzipped csv文件:

spark.read.option("header", "true").csv("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

  • 您可以使用 `*` 通配符 - `df = spark.read.option("header", "true").csv("some_path/*.gz")`。它也适用于多个文件夹 - `df = Spark.read.option("header", "true").csv("some_path/*/*.gz")` (3认同)

Rom*_*ski 6

我不确定在这里写答案和提出这个问题时这是否发生了变化,但我想插入我的发现,以供我自己和其他也遇到同样问题的人将来参考。我正在将 GZIP 压缩的 CSV 文件加载到 Google 托管 Spark-As-A-Service 产品(又名“Dataproc”)内的 Spark 版本 2.4.7 和 python 版本 3.7.4 上的 PySpark DataFrame 中。如果您想进一步研究规格,则基础 Dataproc 映像版本为1.5-debian10 。

\n

我的问题是,如果所有输入都仍然是乱码,我就无法成功读取 CSV。我可以通过更改文件名的结尾来进行一个小调整,以便文件后缀为.gz,然后一切就完美了。这是重现该问题的代码。

\n
# This is a shell script to get a dummy file created with 2 different endings\necho \'foo,bar,baz\' > test.csv\ngzip test.csv\n# So now there are 2 files with 2 endings\ncp test.csv.gz test_csv\n
Run Code Online (Sandbox Code Playgroud)\n

然后,我可以运行 pyspark 作业,甚至运行交互式 pyspark 会话(如下图所示),以验证 Spark 不会智能地检测文件类型,因为它会查看文件名并根据其名称解释文件类型。

\n
$ pyspark\nPython 3.7.4 (default, Aug 13 2019, 20:35:49) \n[GCC 7.3.0] :: Anaconda, Inc. on linux\nType "help", "copyright", "credits" or "license" for more information.\nSetting default log level to "WARN".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\nWelcome to\n      ____              __\n     / __/__  ___ _____/ /__\n    _\\ \\/ _ \\/ _ `/ __/  `_/\n   /__ / .__/\\_,_/_/ /_/\\_\\   version 2.4.7\n      /_/\n\nUsing Python version 3.7.4 (default, Aug 13 2019 20:35:49)\nSparkSession available as \'spark\'.\n>>> filename_noend = \'test_csv\'\n>>> filename_end = \'test.csv.gz\'\n>>> schema = \'field1 string,field2 string,field3 string\'\n>>> df_noend = spark.read.csv(path=filename_noend, schema=schema, header=False)\n>>> df_noend.show()\n+--------------------+-------------+------+\n|              field1|       field2|field3|\n+--------------------+-------------+------+\n\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd`test.cs...|\xef\xbf\xbd*.\xef\xbf\xbd+T+\n                      |  null|\n+--------------------+-------------+------+\n\n>>> df_end = spark.read.csv(path=filename_end, schema=schema, header=False)\n>>> df_end.show()\n+------+------+------+\n|field1|field2|field3|\n+------+------+------+\n|   foo|   bar|   baz|\n+------+------+------+\n>>> exit()\n
Run Code Online (Sandbox Code Playgroud)\n

遗憾的是,没有办法指定类似的东西compression=\'gzip\'或其他什么。因此,保存带有结尾的 gzip 压缩文件.gz,然后就可以开始了!

\n


Sha*_*kar 3

您可以使用spark.sparkContext.textFile("file.gz")

文件扩展名应该是.gz