何时缓存DataFrame?

Ale*_*zis 7 python apache-spark apache-spark-sql pyspark

我的问题是,我什么时候应该做dataframe.cache()以及何时有用?

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

注意:我的数据帧是从Redshift DB加载的.

非常感谢

这是我的代码:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 12

我什么时候应该做dataframe.cache()以及何时有用?

cache您将在查询中使用什么(以及早期和通常可用的内存).使用什么编程语言(Python或Scala或Java或SQL或R)并不重要,因为底层机制是相同的.

您可以使用explain运算符(其中InMemoryRelation实体反映具有其存储级别的缓存数据集)查看是否在物理计划中缓存了DataFrame :

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

在您cache(或persist)您的DataFrame之后,第一个查询可能会变慢,但它会为以下查询付出代价.

您可以使用以下代码检查数据集是否已缓存:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false
Run Code Online (Sandbox Code Playgroud)

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

是的,不是.缓存代表外部数据集的内容,这样您每次查询时都不需要支付跨网络传输数据的额外费用(在访问外部存储时).

不要缓存您只使用一次或易于计算的内容.否则,cache.


请注意缓存的内容,即Dataset缓存的内容,因为它会缓存不同的查询.

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

Spark SQL中的缓存有一个惊喜.缓存是懒惰的,这就是为什么你支付额外的价格让行缓存第一个动作,但这只发生在DataFrame API上.在SQL中,缓存非常迫切,这会使查询性能产生巨大差异,因为您没有调用操作来触发缓存.

  • @ravimalhotra 缓存数据集,除非您知道这是浪费时间:) 换句话说,始终缓存在同一作业中多次使用的数据帧。 (2认同)

Nan*_*esh 10

在 Spark 中缓存 RDD:这是一种加速多次访问同一个 RDD 的应用程序的机制。每次在该 RDD 上调用操作时,都会重新评估未缓存或检查点的 RDD。有两个用于缓存 RDD 的函数调用:cache()persist(level: StorageLevel). 它们之间的区别是cache()将RDD缓存到内存中,而persist(level)可以根据级别指定的缓存策略缓存在内存中、磁盘上或堆外内存中。 persist()没有参数等同于cache()。我们将在本文后面讨论缓存策略。从 Storage 内存中释放空间由 执行unpersist()

何时使用缓存:正如本文所建议的,建议在以下情况下使用缓存:

  • RDD 在迭代机器学习应用程序中的重用
  • RDD 在独立 Spark 应用程序中的重用
  • 当 RDD 计算很昂贵时,缓存可以帮助降低在一个执行器失败的情况下的恢复成本

  • “每次在该 RDD 上调用操作时,未缓存或未设置检查点的 RDD 都会重新评估”——整个主题中最重要的一句话。 (7认同)

Vai*_*nas 8

实际上,对于您而言,.cache()这完全没有帮助。您不在(至少不在您提供的函数中)数据框上执行任何操作。.cache()如果您将多次使用数据是一个好主意,例如:

data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()
Run Code Online (Sandbox Code Playgroud)

这样,您将只获取一次数据(调用第一个动作.show(),然后下次使用data数据帧时应该更快。但是,请谨慎使用-有时再次获取数据仍然会更快。此外,我建议不要命名相同的名称您的数据帧一遍又一遍。毕竟,数据帧是不可变的对象。

希望这会有所帮助。