Gee*_*ddy 2 java queue multithreading runnable
我有一个 Runnable 即使条件设置为停止也不会停止。不知道为什么会这样。有时一些线程仍然活着,其他则不是。我已经让它运行了一个多小时,看看它是否会停止,但它不会。它与我传递参数的方式有关吗?
public class ParserWorker implements Runnable {
private BlockingQueue<String> queue = null;
private ZipReader zip = null;
public ParserWorker(BlockingQueue<String> queue, ZipReader zip) {
this.queue = queue;
this.zip = zip;
}
@Override
public void run() {
try {
while (!queue.isEmpty() || !zip.isClosed()) {
String line = queue.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是开始一切的课程。
public Start() throws InterruptedException {
File zipFile = new File("C:\\development\\data2.zip");
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
ZipReader zip = new ZipReader(queue, zipFile);
ParserWorker w1 = new ParserWorker(queue, zip);
ParserWorker w2 = new ParserWorker(queue, zip);
ParserWorker w3 = new ParserWorker(queue, zip);
ParserWorker w4 = new ParserWorker(queue, zip);
//Start reading zip file
Thread zipThread = new Thread(zip);
zipThread.start();
//Give a little pause to allow the queue to fill
Thread.sleep(1000);
//Starts the Consumable Threads
Thread t1 = new Thread(w1);
Thread t2 = new Thread(w2);
Thread t3 = new Thread(w3);
Thread t4 = new Thread(w4);
t1.start();
t2.start();
t3.start();
t4.start();
//waits until the zip file is closed.
while (!zip.isClosed()) {
Thread.sleep(5000);
}
//By this point the Zip file is closed, queue may still contain items
System.out.println("Queue isEmpty:" + queue.isEmpty() + ", Zip isClosed:" + zip.isClosed());
//Waits until the Queue is empty
while (!queue.isEmpty()) {
Thread.sleep(5000);
}
//By this point the Zip file is closed and the queue is empty.
System.out.println("Queue isEmpty:" + queue.isEmpty() + ", Zip isClosed:" + zip.isClosed());
while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
Thread.sleep(5000);
}
System.out.println("Done");
Run Code Online (Sandbox Code Playgroud)
我已经包括了 ZipReader 进行澄清。
public class ZipReader implements Runnable{
private boolean closed = true;
private File zipFileName = null;
protected BlockingQueue<String> queue = null;
public ZipReader(BlockingQueue<String> queue, File zipFileName) {
this.queue = queue;
this.zipFileName = zipFileName;
}
public boolean isClosed() {
return closed;
}
@Override
public void run() {
String line = null;
long index = 1L;
ZipFile zipFile = null;
try {
System.out.println("Opening Zip file");
zipFile = new ZipFile(zipFileName);
closed = false;
System.out.println("Getting entries");
Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()) {
ZipEntry zipEntryObj = entries.nextElement();
System.out.println("Processing file: " + zipEntryObj.getName());
InputStream input = zipFile.getInputStream(zipEntryObj);
InputStreamReader isr = new InputStreamReader(input);
BufferedReader br = new BufferedReader(isr);
while ((line = br.readLine()) != null) {
// if (index++ % 10000 == 0) {
// System.out.println("Index:" + index);
// }
// System.out.println(line);
queue.put(line);
}
br.close();
System.out.println();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Closing zip file");
try {
if (zipFile != null) {
zipFile.close();
}
closed = true;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我得到的输出。
Opening Zip file
Getting entries
Processing file: data01.txt
Processing file: data02.txt
Processing file: data03.txt
Processing file: data04.txt
Processing file: data05.txt
Processing file: data06.txt
Processing file: data07.txt
Processing file: data08.txt
Processing file: data09.txt
Processing file: data10.txt
Closing zip file
Queue isEmpty:true, Zip isClosed:true
Queue isEmpty:true, Zip isClosed:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
Run Code Online (Sandbox Code Playgroud)
问题在于您while在ParserWorker课堂上的循环。我指的是:
while (!queue.isEmpty() || !zip.isClosed()) {
String line = queue.take();
}
Run Code Online (Sandbox Code Playgroud)
使用此代码,您会遇到并发问题。当调用的take方法时BlockingQueue,可能会发生两件事:
您的代码的问题在于您假设String line = queue.take();和!queue.isEmpty()是同时执行的,没有并发问题。但是,可能会(在您的情况下)会发生以下情况:
!queue.isEmpty()同时进行。对于两个线程,返回值都是true,因为队列不为空。queue.take()并获取这个单个元素,然后再次检查while条件并终止,因为队列还为空。queue.take()在一个空队列上调用,因为第一个线程已经从队列中获取了元素。出于这个原因,线程会阻塞,直到将新元素放入队列。但是,在某些时候这在您的场景中永远不会发生,因为所有行都被读取。这意味着该线程将永远被阻塞,因为没有其他线程会为其放置元素。有很多解决方案可以解决这个问题。一个简单的将是改变
while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
Thread.sleep(5000);
}
Run Code Online (Sandbox Code Playgroud)
到
while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
queue.put("");
Thread.sleep(5000);
}
Run Code Online (Sandbox Code Playgroud)
使用该行queue.put("");确保元素将被放置,直到所有线程终止。在您的情况下,这将最多执行 3 次。这不是最合适的解决方案,并且会使您的线程读取空字符串,但它是最简单的解决方案。如果这不起作用,您可以花一些时间来提出适合您需要的其他同步。
另一种解决方案可以使用队列作为锁来使用额外的同步:
while (!queue.isEmpty() || !zip.isClosed()) {
synchronized (queue) {
if(!queue.isEmpty()) {
String line = queue.take();
}
}
}
Run Code Online (Sandbox Code Playgroud)
通过这种方式可以确保检查队列是否为空,并在每个线程没有一起执行的情况下取一个元素,这样没有线程会尝试弹出一个空队列。你必须决定你喜欢哪种方法。第二个需要额外的同步,但不会强制您的线程弹出空字符串。
| 归档时间: |
|
| 查看次数: |
895 次 |
| 最近记录: |