据我了解,Apache Spark 使用惰性求值。因此,例如如下仅包含转换的代码将不会进行实际处理:
val transformed_df = df.filter("some_field = 10").select("some_other_field", "yet_another_field")
Run Code Online (Sandbox Code Playgroud)
只有当我们执行“操作”时,才会真正发生任何处理:
transformed_df.show()
Run Code Online (Sandbox Code Playgroud)
我一直认为 Spark 中的加载操作也是懒惰的。(请参阅Spark 如何将数据加载到内存中。)
然而,我的 Spark 经验并没有证实这一点。当我做类似下面的事情时,
val df = spark.read.parquet("/path/to/parquet/")
Run Code Online (Sandbox Code Playgroud)
执行似乎很大程度上取决于路径中数据的大小。换句话说,它并不是严格意义上的懒惰。如果数据已分区并且我只需要查看一小部分分区,这会很不方便。
例如:
df.filter("partitioned_field = 10").show()
Run Code Online (Sandbox Code Playgroud)
如果数据在“partitioned_field”上的存储中进行分区,我预计spark会等到show()被调用,然后只读取“/path/to/parquet/partitioned_field=10/”下的数据。但话又说回来,情况似乎并非如此。Spark 似乎在调用 read 或 load 后立即对所有数据执行至少一些操作。
/path/to/parquet/partitioned_field=10/
我可以通过仅首先加载来解决这个问题,但这比仅调用“read”并在分区字段上进行过滤要差得多,而且更难概括。
是否有更优雅的首选方式来延迟加载镶木地板数据分区?
(澄清一下,我使用的是 Spark 2.4.3)
我想我在学习 Spark 中的惰性评估时经常被忽视的一个关键区别时偶然发现了我的问题的答案。
数据是延迟评估的,但模式不是。因此,如果我们正在读取 parquet(一种结构化数据类型),spark 至少必须在调用 read() 或 load() 后立即确定其正在读取的任何文件的架构。因此,对大量文件调用 read() 比对少量文件调用 read() 花费的时间更长。
鉴于分区是架构的一部分,现在 Spark 在过滤分区字段之前必须查看路径中的所有文件以确定架构,这对我来说并不奇怪。
如果 Spark 等到模式评估是绝对必要的并且能够在确定模式的其余部分之前过滤分区字段,那么对我的目的来说会很方便,但听起来情况并非如此。我相信 Dataset 对象总是必须有一个模式,所以我不确定是否有办法在不对 Spark 进行重大更改的情况下解决这个问题。
总之,如果我想避免评估整个数据存储库的模式,目前我唯一的选择似乎是传递我需要的分区的路径列表,而不是基本路径。
归档时间: |
|
查看次数: |
2876 次 |
最近记录: |