ars*_*nal 5 java multithreading file bufferedreader
我正在尝试使用Java读取一个非常大的文件.那个大文件会有这样的数据,这意味着每一行都有一个用户ID.
149905320
1165665384
66969324
886633368
1145241312
286585320
1008665352
Run Code Online (Sandbox Code Playgroud)
在那个大文件中,将有大约3000万用户ID.现在我试图从那个大文件中一个一个地读取所有用户ID.意味着每个用户ID只能从该大文件中选择一次.例如,如果我有30万个用户ID,那么它应该使用多线程代码只打印一次3000万用户ID.
下面是我的代码,它是一个运行10个线程的多线程代码但是使用下面的程序,我无法确保每个用户ID只被选中一次.
public class ReadingFile {
public static void main(String[] args) {
// create thread pool with given size
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.submit(new FileTask());
}
}
}
class FileTask implements Runnable {
@Override
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("D:/abc.txt"));
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
//do things with line
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮我吗?我做错了什么?什么是最快的方法呢?
Zim*_*oot 17
你真的无法改进让一个线程按顺序读取文件,假设你没有做过像在多个磁盘上条带化文件那样的事情.使用一个线程,您可以执行一次搜索,然后执行一次长序列读取; 对于多个线程,您将使线程导致多次搜索,因为每个线程都获得对磁盘头的控制.
编辑:这是一种并行化处理的方法,同时仍然使用串行I/O来读取行.它使用BlockingQueue在线程之间进行通信; 的FileTask增加线到队列,并且CPUTask读取它们并对其进行处理.这是一个线程安全的数据结构,因此无需向其添加任何同步.您正在使用put(E e)向队列添加字符串,因此如果队列已满(它可以容纳最多200个字符串,如声明中所定义ReadingFile),则FileTask直到空间释放为止; 同样,您正在使用take()从队列中删除项目,因此CPUTask将阻止直到项目可用.
public class ReadingFile {
public static void main(String[] args) {
final int threadCount = 10;
// BlockingQueue with a capacity of 200
BlockingQueue<String> queue = new ArrayBlockingQueue<>(200);
// create thread pool with given size
ExecutorService service = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < (threadCount - 1); i++) {
service.submit(new CPUTask(queue));
}
// Wait til FileTask completes
service.submit(new FileTask(queue)).get();
service.shutdownNow(); // interrupt CPUTasks
// Wait til CPUTasks terminate
service.awaitTermination(365, TimeUnit.DAYS);
}
}
class FileTask implements Runnable {
private final BlockingQueue<String> queue;
public FileTask(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("D:/abc.txt"));
String line;
while ((line = br.readLine()) != null) {
// block if the queue is full
queue.put(line);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class CPUTask implements Runnable {
private final BlockingQueue<String> queue;
public CPUTask(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
String line;
while(true) {
try {
// block if the queue is empty
line = queue.take();
// do things with line
} catch (InterruptedException ex) {
break; // FileTask has completed
}
}
// poll() returns null if the queue is empty
while((line = queue.poll()) != null) {
// do things with line;
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4293 次 |
| 最近记录: |