从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:
df.select("foo")
df.select($"foo")
myDataSet.map(foo.someVal)
是类型安全的,不会转换为RDD
但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)df.select("foo")
没有地图声明,我怎么能输入?
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
这是我正在运行的示例代码.
使用mod列作为分区创建测试镶木地板数据集.
scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]
scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)
之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.
scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]
scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)
你可以在执行计划中看到它只读取1个分区.
但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.
scala> case class Test(id: Long, mod: Long)
defined class Test
scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …
Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer
介绍
我注意到我们项目中的所有推送过滤器都不起作用。它解释了为什么执行时间受到影响,因为它读取了数百万次读取,而本应将其减少到数千次。为了调试问题,我编写了一个小测试,它读取一个 CSV 文件,过滤内容(PushDown Filter)并返回结果。
它不适用于 CSV,因此我尝试读取镶木地板文件。它们都不起作用。
数据
该people.csv
文件具有以下结构:
first_name,last_name,city // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
Run Code Online (Sandbox Code Playgroud)
注意:镶木地板文件具有相同的结构
读取 CSV 文件
为了重现这个问题,我编写了一个最小的代码来读取一个 csv 文件并且应该只返回过滤后的数据。
读取 csv 文件并打印物理计划:
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
Run Code Online (Sandbox Code Playgroud)
物理计划:
+-----------+---------+----+
|名字|姓氏|城市|
+-----------+---------+----+
|FirstName1|LastName1|Bern|
+------------+---------+----+== 解析的逻辑计划 == 关系 [first_name#10,last_name#11,city#12] csv
== 分析的逻辑计划 == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv
== 优化逻辑规划 == 关系[first_name#10,last_name#11,city#12] csv
== 物理计划 == *(1) FileScan csv [first_name#10,last_name#11,city#12] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:people.csv],PartitionFilters:[],PushedFilters : …