如何将大型python模型应用于pyspark-dataframe?

Iva*_*ikh 6 python machine-learning apache-spark pyspark pyspark-sql

我有:

  • 包含某些数据(功能)的大型数据框(镶木地板格式,100.000.000行,4.5TB大小)
  • 几种大型ML模型(每个模型需要5-15GB的RAM)
  • Spark群集(AWS EMR),典型的节点配置是8 CPU,32 RAM,可以根据需要进行更改。

我想使用PySpark来应用它们,但是我总是会遇到一些有线错误,例如:

  • OOM
  • 随机超时(节点不返回任何结果)->节点被YARN管理器杀死

我通常使用类似的代码

def apply_model(partition):
    model = load(...)  # load model only when apply this function to avoid serialization issue
    for row in partition:
        yield model.infer(row)
Run Code Online (Sandbox Code Playgroud)

要么

def apply_model(partition):
    model = load(...)  # load model only when apply this function to 
    yield from model.infer(partition)
Run Code Online (Sandbox Code Playgroud)

并使用

df.select(...).rdd.mapPartitions(apply_model)

broadcast由于序列化的原因,我无法建模。

问题-如何应用基于python / any-non-jvm的大型模型来引发数据帧并避免引发异常?

Mic*_*tor 0

您的分区适合单个执行器的内存吗?您可以尝试增加分区数量,看看情况是否有所改善:

df.select(...).repartition(1000).rdd.mapPartitions(apply_model)
Run Code Online (Sandbox Code Playgroud)

通过查看 Spark UI 的指标来验证这是否是一项改进,例如:

  • 输入大小/记录
  • 随机写入大小/记录
  • 气相色谱时间

比较Median75th percentileMax值以查看您的数据是否不存在偏差。