Spark on Java - 在所有工作者上拥有静态对象的正确方法是什么

Roe*_*rel 6 java static apache-spark

我需要在Spark中的所有执行程序的函数中使用不可序列化的第三方类,例如:

JavaRDD<String> resRdd = origRdd
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String t) throws Exception {

        //A DynamoDB mapper I don't want to initialise every time
        DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials));

        Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav();

        return userFav;
    }
});
Run Code Online (Sandbox Code Playgroud)

我希望有一个静态DynamoDBMapper mapper,我为每个执行者初始化一次,并且能够一遍又一遍地使用它.

由于它不是可序列化的,我无法在驱动器中初始化它并进行广播.

注意:这是一个答案(在所有工作者上有一个静态对象的正确方法),但它仅适用于Scala.

Ale*_*spo 7

你可以使用mapPartitionforeachPartition.这是一个取自Learning Spark的片段

通过使用基于分区的操作,我们可以将连接池共享到此数据库,以避免设置许多连接,并重用我们的JSON解析器.如例6-10到6-12所示,我们使用mapPartitions()函数,它给出了输入RDD的每个分区中元素的迭代器,并期望我们返回结果的迭代器.

这允许我们初始化每个执行器的一个连接,然后迭代分区中的元素,但是你想要.这对于将数据保存到某些外部数据库或创建昂贵的可重用对象非常有用.

这是一个简单的scala示例,取自链接的书.如果需要,可以将其转换为java.这里只是展示mapPartition和foreachPartition的一个简单用例.

ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition =>
    // Open connection to storage system (e.g. a database connection)
    partition.foreach { item =>
    // Use connection to push item to system
    }
    // Close connection
    } 
}
Run Code Online (Sandbox Code Playgroud)

这是一个 java示例的链接.

  • @donald如果将连接池作为临时值添加到Object(单例),则可以.每个执行程序将初始化一次Object,并在引用它时在每个执行程序上创建连接池.您现在可以为每个分区重用该连接池. (2认同)