相关疑难解决方法(0)

Spark 2.0 Dataset vs DataFrame

从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我能正确理解吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)
  • 所有其他命令,例如select,..只是语法糖.它们不是类型安全的,可以使用地图代替.如果df.select("foo")没有地图声明,我怎么能输入?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图保留在数据集表示中)?

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

23
推荐指数
1
解决办法
4687
查看次数

Parquet过滤器下推功能不适用于Spark Dataset API

这是我正在运行的示例代码.

使用mod列作为分区创建测试镶木地板数据集.

scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]

scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)

之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.

scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]

scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)

你可以在执行计划中看到它只读取1个分区.

但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.

scala> case class Test(id: Long, mod: Long)
defined class Test

scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer

5
推荐指数
0
解决办法
2342
查看次数

Spark 不推送过滤器(PushedFilters 数组为空)

介绍

我注意到我们项目中的所有推送过滤器都不起作用。它解释了为什么执行时间受到影响,因为它读取了数百万次读取,而本应将其减少到数千次。为了调试问题,我编写了一个小测试,它读取一个 CSV 文件,过滤内容(PushDown Filter)并返回结果。

它不适用于 CSV,因此我尝试读取镶木地板文件。它们都不起作用。

数据

people.csv文件具有以下结构:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
Run Code Online (Sandbox Code Playgroud)

注意:镶木地板文件具有相同的结构

读取 CSV 文件

为了重现这个问题,我编写了一个最小的代码来读取一个 csv 文件并且应该只返回过滤后的数据。

读取 csv 文件并打印物理计划:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
Run Code Online (Sandbox Code Playgroud)

物理计划:

+-----------+---------+----+
|名字|姓氏|城市|
+-----------+---------+----+
|FirstName1|LastName1|Bern|
+------------+---------+----+

== 解析的逻辑计划 == 关系 [first_name#10,last_name#11,city#12] csv

== 分析的逻辑计划 == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv

== 优化逻辑规划 == 关系[first_name#10,last_name#11,city#12] csv

== 物理计划 == *(1) FileScan csv [first_name#10,last_name#11,city#12] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:people.csv],PartitionFilters:[],PushedFilters : …

java csv apache-spark parquet

3
推荐指数
1
解决办法
1186
查看次数