如何缓存分区数据集并在多个查询中使用?

Jos*_*osé 5 java apache-spark apache-spark-sql

我有以下代码:

dataset
       .distinct()
       .repartition(400)
       .persist(StorageLevel.MEMORY_ONLY())
       .createOrReplaceTempView("temp");
sqlContext.sql("select * from temp");
Run Code Online (Sandbox Code Playgroud)

这只是一个例子,我需要对同一个实体执行大约 100 个查询,这就是我坚持它的原因。我以为当我查询时temp它会查询缓存的实体,但是当我检查 spark ui 查询详细信息时,我看到每个查询都执行了重新分区temp,因此查询数据集并为每个查询执行 DAG。

- - - - - - - - - 编辑 - - - - - - - - - - - -

这里查询的图表和逻辑计划,对我来说是一样的,我的期望是对于第一个查询执行所有必需的步骤,然后它将直接访问内存中的视图。

我已检查sqlContext.isCached("temp")并打印true

查询执行图

在此处输入图片说明

第一个查询计划

== Parsed Logical Plan ==
'Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, 'purchase_activity AS value#4961]
+- 'UnresolvedRelation `filter_temp`

== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, purchase_activity#4062 AS value#4961]
+- SubqueryAlias filter_temp, `filter_temp`
   +- Aggregate [purchase_activity#4062], [purchase_activity#4062]
      +- Project [purchase_activity#4062]
         +- Repartition 400, true
            +- GlobalLimit 10000
               +- LocalLimit 10000
                  +- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
                     +- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet
Run Code Online (Sandbox Code Playgroud)

其他查询计划

== Parsed Logical Plan ==
'Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, 'top_brand_1 AS value#6819]
+- 'UnresolvedRelation `filter_temp`

== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, top_brand_1#4072 AS value#6819]
+- SubqueryAlias filter_temp, `filter_temp`
   +- Aggregate [top_brand_1#4072], [top_brand_1#4072]
      +- Project [top_brand_1#4072]
         +- Repartition 400, true
            +- GlobalLimit 10000
               +- LocalLimit 10000
                  +- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
                     +- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet
Run Code Online (Sandbox Code Playgroud)

这里是 Spark UI 存储页面的屏幕截图,以防万一也有帮助。

在此处输入图片说明

如何从 spark-sql 访问这个持久化数据集?

Jac*_*ski 3

如何从spark-sql 访问这个持久数据集?

只要您在其他查询中引用缓存的查询,Spark SQL 就会为您重复使用它们。使用explain操作员确认 Web UI 的“作业详细信息”页面(在“作业”选项卡下)。


数据集persistcache运算符是惰性的(与 SQLCACHE TABLE查询相反),因此在后面的操作之后它不会被真正缓存。

dataset
  .distinct()
  .repartition(400)
  .persist(StorageLevel.MEMORY_ONLY())
  .createOrReplaceTempView("temp");
Run Code Online (Sandbox Code Playgroud)

persist(StorageLevel.MEMORY_ONLY())只是提示 Spark SQL 应在下次执行操作时缓存该关系。head这导致了人们执行或count触发缓存的操作的模式。

持久化表后,您可以使用 Web UI 的“存储”选项卡查看其相应的缓存 RDD 条目。

此外,cache在已经缓存的数据集上执行将会给你一个警告。

scala> q.distinct.repartition(5).cache.head
17/05/16 10:57:49 WARN CacheManager: Asked to cache already cached data.
res4: org.apache.spark.sql.Row = [2]

scala> q.cache()
17/05/16 10:59:54 WARN CacheManager: Asked to cache already cached data.
res6: q.type = [key: bigint]
Run Code Online (Sandbox Code Playgroud)

认为您期望在缓存后执行计划应该以某种方式更短,因此已经执行的步骤应该以某种方式从计划中删除。那是对的吗?

如果是这样,您的理解是部分正确的。部分原因是虽然查询计划的部分内容仍在执行计划中,但是一旦执行查询,就应该已经从缓存中获取缓存的表(以及相应的阶段)。

您可以在参与查询的作业的“作业详细信息”页面中查看。缓存部分在执行 DAG 中使用绿色小圆圈进行标记。

带有缓存部分的执行计划

引用通过可视化了解 Apache Spark 应用程序

其次,其中一个 RDD 在第一阶段缓存(由绿色突出显示)。由于封闭操作涉及从 HDFS 读取,因此缓存此 RDD 意味着此 RDD 上的未来计算可以从内存而不是从 HDFS 访问至少原始文件的子集。

您还可以使用运算符查看查询以及缓存的数据源explain

scala> q.explain
== Physical Plan ==
InMemoryTableScan [key#21L]
   +- InMemoryRelation [key#21L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [(id#18L % 5) AS key#21L]
            +- *Range (0, 1000, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)