Roe*_*rel 6 java static apache-spark
我需要在Spark中的所有执行程序的函数中使用不可序列化的第三方类,例如:
JavaRDD<String> resRdd = origRdd
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {
//A DynamoDB mapper I don't want to initialise every time
DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials));
Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav();
return userFav;
}
});
Run Code Online (Sandbox Code Playgroud)
我希望有一个静态DynamoDBMapper mapper,我为每个执行者初始化一次,并且能够一遍又一遍地使用它.
由于它不是可序列化的,我无法在驱动器中初始化它并进行广播.
注意:这是一个答案(在所有工作者上有一个静态对象的正确方法),但它仅适用于Scala.
你可以使用mapPartition或foreachPartition.这是一个取自Learning Spark的片段
通过使用基于分区的操作,我们可以将连接池共享到此数据库,以避免设置许多连接,并重用我们的JSON解析器.如例6-10到6-12所示,我们使用mapPartitions()函数,它给出了输入RDD的每个分区中元素的迭代器,并期望我们返回结果的迭代器.
这允许我们初始化每个执行器的一个连接,然后迭代分区中的元素,但是你想要.这对于将数据保存到某些外部数据库或创建昂贵的可重用对象非常有用.
这是一个简单的scala示例,取自链接的书.如果需要,可以将其转换为java.这里只是展示mapPartition和foreachPartition的一个简单用例.
ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition =>
// Open connection to storage system (e.g. a database connection)
partition.foreach { item =>
// Use connection to push item to system
}
// Close connection
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2661 次 |
| 最近记录: |