如何在spark中对每个执行程序执行一次操作

Neh*_*eha 26 scala partitioning weka apache-spark

我有一个存储在S3中的weka模型,大小约为400MB.现在,我有一些记录,我想在其中运行模型并执行预测.

为了进行预测,我试过的是,

  1. 在驱动程序上下载并加载模型作为静态对象,将其广播给所有执行程序.对预测RDD执行映射操作.---->不工作,如在Weka中执行预测,需要修改模型对象,并且广播需要只读副本.

  2. 在驱动程序上下载并加载模型作为静态对象,并在每个映射操作中将其发送到执行程序.----->工作(效率不高,如在每个地图操作中,我传递400MB对象)

  3. 在驱动程序上下载模型并将其加载到每个执行程序上并将其缓存在那里.(不知道该怎么做)

有人知道如何在每个执行程序上加载模型一次并将其缓存,以便其他记录我不再加载它?

Dia*_*rat 27

您有两种选择:

1.使用表示数据的惰性val创建单个对象:

    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       
Run Code Online (Sandbox Code Playgroud)

然后,您可以在map函数中使用lazy val .在lazy val每个工人JVM初始化自己的数据的情况下保证了.不会执行序列化或广播data.

    elementsRDD.map { element =>
        // use WekaModel.data here
    }
Run Code Online (Sandbox Code Playgroud)

好处

  • 更高效,因为它允许您为每个JVM实例初始化一次数据.例如,当需要初始化数据库连接池时,此方法是一个不错的选择.

缺点

  • 减少对初始化的控制.例如,如果需要运行时参数,初始化对象会比较棘手.
  • 如果需要,您无法真正释放或释放对象.通常,这是可以接受的,因为操作系统将在进程退出时释放资源.

2. 在RDD上使用mapPartition(或foreachPartition)方法而不是仅使用map.

这允许您初始化整个分区所需的任何内容.

    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }
Run Code Online (Sandbox Code Playgroud)

优点:

  • 在对象的初始化和取消初始化方面提供更大的灵活性.

缺点

  • 每个分区都将创建并初始化对象的新实例.根据每个JVM实例的分区数量,它可能是也可能不是问题.

  • 你能用Java做同样的事吗? (4认同)
  • 你确定#1?我收到序列化错误.另外,如果数据初始化取决于运行时参数,您会怎么做? (2认同)
  • 关于这个缺点,"少控制初始化.例如,如果需要运行时参数,初始化对象会比较棘手.这正是我想要实现的目标.你有任何例子或者你看到过这个吗?我正在调用外部系统来获取数据库连接配置.理想情况下,我不想在每个执行程序上调用外部系统.我刚刚问了一个非常相似的问题./sf/ask/3306931771/ (2认同)