我在mongoDB中有4000万个数据.我正在从集合中并行读取数据,处理它并转储到另一个集合中.
作业初始化的示例代码.
ExecutorService executor = Executors.newFixedThreadPool(10);
int count = total_number_of_records in reading collection
int pageSize = 5000;
int counter = (int) ((count%pageSize==0)?(count/pageSize):(count/pageSize+1));
for (int i = 1; i <= counter; i++) {
Runnable worker = new FinalParallelDataProcessingStrategyOperator(mongoDatabase,vendor,version,importDate,vendorId,i,securitiesId);
executor.execute(worker);
}
Run Code Online (Sandbox Code Playgroud)
每个线程都在做以下事情
public void run() {
try {
List<SecurityTemp> temps = loadDataInBatch();
populateToNewCollection(temps);
populateToAnotherCollection(temps);
} catch (IOException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
使用以下查询对加载数据进行分页
mongoDB.getCollection("reading_collection").find(whereClause).
.skip(pagesize*(n-1)).limit(pagesize).batchSize(1000).iterator();
Run Code Online (Sandbox Code Playgroud)
机器配置:2个CPU,每个1核
并行实现提供与顺序几乎相同的性能.数据子集的统计数据(319568条记录)
No. of Threads Execution Time(minutes)
1 16
3 15
8 17
10 17
15 …Run Code Online (Sandbox Code Playgroud)