根据Learning Spark的说法
请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.
我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.
如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?
我们正在运行spark 2.3.0 AWW EMR.以下DataFrame" df"非空且大小适中:
scala> df.count
res0: Long = 4067
Run Code Online (Sandbox Code Playgroud)
下面的代码工作正常写df到hdfs:
scala> val hdf = spark.read.parquet("/tmp/topVendors")
hdf: org.apache.spark.sql.DataFrame = [displayName: string, cnt: bigint]
scala> hdf.count
res4: Long = 4067
Run Code Online (Sandbox Code Playgroud)
但是,使用相同的代码写入本地parquet或csv文件最终结果为空:
df.repartition(1).write.mode("overwrite").parquet("file:///tmp/topVendors")
scala> val locdf = spark.read.parquet("file:///tmp/topVendors")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
at scala.Option.getOrElse(Option.scala:121)
Run Code Online (Sandbox Code Playgroud)
我们可以看到它失败的原因:
ls -l /tmp/topVendors
total 0
-rw-r--r-- 1 hadoop hadoop 0 Jul 30 …Run Code Online (Sandbox Code Playgroud) 上周我很难从Spark中获取数据,最后我不得不随意使用
df.toPandas().to_csv('mycsv.csv')
Run Code Online (Sandbox Code Playgroud)
出于这个答案.
我测试了更多的原生
df.write.csv('mycsv.csv')
Run Code Online (Sandbox Code Playgroud)
对于Spark 2.0+但是根据下面的注释,它会丢弃一组csv文件而不是一个需要连接的文件,无论在这种情况下是什么意思.它还将一个空文件放入名为"success"的目录中.目录名是/ mycsv /,但csv本身的长字符串中有一个难以理解的名称.
这是我第一次听说过这样的事情.好吧,Excel有多个选项卡,必须以某种方式反映在.xls文件中,NumPy数组可以是多维的,但我认为csv文件只是一个标题,值由行中的逗号分隔成列.
query.repartition(1).write.csv("cc_out.csv", sep='|')
Run Code Online (Sandbox Code Playgroud)
因此,这只会删除一个文件和空白的"成功"文件,但该文件仍然没有您想要的名称,该目录的名称.
有谁知道为什么Spark会这样做,为什么它不会简单地输出一个csv,它如何命名csv,这个成功文件应包含什么,如果连接csv文件意味着在这里垂直连接它们,从头到尾.