gfy*_*ytd 5 java caching dataframe apache-spark
我刚刚写了一个玩具类来测试 Spark 数据帧(实际上是数据集,因为我使用的是 Java)。
Dataset<Row> ds = spark.sql("select id,name,gender from test2.dummy where dt='2018-12-12'");
ds = ds.withColumn("dt", lit("2018-12-17"));
ds.cache();
ds.write().mode(SaveMode.Append).insertInto("test2.dummy");
//
System.out.println(ds.count());
Run Code Online (Sandbox Code Playgroud)
根据我的理解,有 2 个动作,“insertInto”和“count”。
我一步一步调试代码,运行“insertInto”时,看到几行:
19/01/21 20:14:56 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]
Run Code Online (Sandbox Code Playgroud)
运行“count”时,我仍然看到类似的日志:
19/01/21 20:15:26 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]
Run Code Online (Sandbox Code Playgroud)
我有两个问题:
1)当在同一个数据帧上有 2 个动作时,如果我不显式调用 ds.cache 或 ds.persist,第二个动作是否总是会导致重新执行 sql 查询?
2)如果我正确理解日志,这两个操作都会触发 hdfs 文件读取,这是否意味着 ds.cache() 实际上在这里不起作用?如果是这样,为什么它在这里不起作用?
非常感谢。
这是因为您附加到ds
创建的表中,因此ds
需要重新计算,因为基础数据已更改。在这种情况下,spark 会使缓存失效。如果您阅读例如此 Jira ( https://issues.apache.org/jira/browse/SPARK-24596 ):
使缓存失效时,我们会使依赖于该缓存的其他缓存失效,以确保缓存数据是最新的。例如,当底层表被修改或表本身被删除时,所有使用该表的缓存都应该失效或刷新。
尝试ds.count
在插入表之前运行。
归档时间: |
|
查看次数: |
1946 次 |
最近记录: |