使用 BlockingQueue 时 Runnable 不会停止

Gee*_*ddy 2 java queue multithreading runnable

我有一个 Runnable 即使条件设置为停止也不会停止。不知道为什么会这样。有时一些线程仍然活着,其他则不是。我已经让它运行了一个多小时,看看它是否会停止,但它不会。它与我传递参数的方式有关吗?

public class ParserWorker implements Runnable {

    private BlockingQueue<String> queue = null;
    private ZipReader zip = null;

    public ParserWorker(BlockingQueue<String> queue, ZipReader zip) {
        this.queue = queue;
        this.zip = zip;
    }

    @Override
    public void run() {
        try {
            while (!queue.isEmpty() || !zip.isClosed()) {
                String line = queue.take();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是开始一切的课程。

public Start() throws InterruptedException   {
    File zipFile = new File("C:\\development\\data2.zip");
    BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);

    ZipReader zip = new ZipReader(queue, zipFile);

    ParserWorker w1 = new ParserWorker(queue, zip);
    ParserWorker w2 = new ParserWorker(queue, zip);
    ParserWorker w3 = new ParserWorker(queue, zip);
    ParserWorker w4 = new ParserWorker(queue, zip);

    //Start reading zip file
    Thread zipThread = new Thread(zip);
    zipThread.start();

    //Give a little pause to allow the queue to fill
    Thread.sleep(1000);

    //Starts the Consumable Threads
    Thread t1 = new Thread(w1);
    Thread t2 = new Thread(w2);
    Thread t3 = new Thread(w3);
    Thread t4 = new Thread(w4);

    t1.start();
    t2.start();
    t3.start();
    t4.start();

    //waits until the zip file is closed.
    while (!zip.isClosed()) {
        Thread.sleep(5000);
    }

    //By this point the Zip file is closed, queue may still contain items
    System.out.println("Queue isEmpty:" + queue.isEmpty() + ", Zip isClosed:" + zip.isClosed());

    //Waits until the Queue is empty
    while (!queue.isEmpty()) {
        Thread.sleep(5000);
    }

    //By this point the Zip file is closed and the queue is empty.
    System.out.println("Queue isEmpty:" + queue.isEmpty() + ", Zip isClosed:" + zip.isClosed());

    while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
        System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
        Thread.sleep(5000);
    }


    System.out.println("Done");
Run Code Online (Sandbox Code Playgroud)

我已经包括了 ZipReader 进行澄清。

public class ZipReader implements Runnable{

    private boolean closed = true;
    private File zipFileName = null;

    protected BlockingQueue<String> queue = null;

    public ZipReader(BlockingQueue<String> queue, File zipFileName) {
        this.queue = queue; 
        this.zipFileName = zipFileName;
    }

    public boolean isClosed() {
        return closed;
    }

    @Override
    public void run() {
      String line = null;
      long index = 1L;
      ZipFile zipFile = null;

      try {
            System.out.println("Opening Zip file");
            zipFile = new ZipFile(zipFileName);
            closed = false;
            System.out.println("Getting entries");
            Enumeration<? extends ZipEntry> entries = zipFile.entries();
            while (entries.hasMoreElements()) {
                ZipEntry zipEntryObj = entries.nextElement();
                System.out.println("Processing file: " + zipEntryObj.getName());
                InputStream input = zipFile.getInputStream(zipEntryObj);
                InputStreamReader isr = new InputStreamReader(input);

                BufferedReader br = new BufferedReader(isr);
                while ((line = br.readLine()) != null) {
//                    if (index++ % 10000 == 0) {
//                        System.out.println("Index:" + index);
//                    }
//                    System.out.println(line);
                    queue.put(line);
                }
                br.close();
                System.out.println();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Closing zip file");
            try {
                if (zipFile != null) {
                    zipFile.close();
                }
                closed = true;
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

}
Run Code Online (Sandbox Code Playgroud)

这是我得到的输出。

Opening Zip file
Getting entries
Processing file: data01.txt
Processing file: data02.txt
Processing file: data03.txt
Processing file: data04.txt
Processing file: data05.txt
Processing file: data06.txt
Processing file: data07.txt
Processing file: data08.txt
Processing file: data09.txt
Processing file: data10.txt
Closing zip file
Queue isEmpty:true, Zip isClosed:true
Queue isEmpty:true, Zip isClosed:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
T1 alive:true, T2 alive:true,T3 alive:true,T4 alive:true
Run Code Online (Sandbox Code Playgroud)

Iva*_*kov 5

问题在于您whileParserWorker课堂上的循环。我指的是:

while (!queue.isEmpty() || !zip.isClosed()) {
  String line = queue.take();
}
Run Code Online (Sandbox Code Playgroud)

使用此代码,您会遇到并发问题。当调用的take方法时BlockingQueue,可能会发生两件事:

  1. 如果队列不为空,线程会以同步的方式获取下一个元素。这意味着没有两个线程可以获得相同的元素。
  2. 如果队列为空,则线程将等待直到放入元素。

您的代码的问题在于您假设String line = queue.take();!queue.isEmpty()是同时执行的,没有并发问题。但是,可能会(在您的情况下)会发生以下情况:

  1. 队列中只有一个元素。
  2. 两个线程检查是否!queue.isEmpty()同时进行。对于两个线程,返回值都是true,因为队列不为空。
  3. 第一个(更快的)线程调用queue.take()并获取这个单个元素,然后再次检查while条件并终止,因为队列还为空。
  4. 第二个(较慢的)线程queue.take()在一个空队列上调用,因为第一个线程已经从队列中获取了元素。出于这个原因,线程会阻塞,直到将新元素放入队列。但是,在某些时候这在您的场景中永远不会发生,因为所有行都被读取。这意味着该线程将永远被阻塞,因为没有其他线程会为其放置元素。

有很多解决方案可以解决这个问题。一个简单的将是改变

while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
  System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
  Thread.sleep(5000);
}
Run Code Online (Sandbox Code Playgroud)

while (t1.isAlive() || t2.isAlive() || t3.isAlive() || t4.isAlive()) {
  System.out.println("T1 alive:" + t1.isAlive() + ", T2 alive:" + t2.isAlive() + ",T3 alive:" + t3.isAlive() + ",T4 alive:" + t4.isAlive());
  queue.put("");
  Thread.sleep(5000);
}
Run Code Online (Sandbox Code Playgroud)

使用该行queue.put("");确保元素将被放置,直到所有线程终止。在您的情况下,这将最多执行 3 次。这不是最合适的解决方案,并且会使您的线程读取空字符串,但它是最简单的解决方案。如果这不起作用,您可以花一些时间来提出适合您需要的其他同步。

另一种解决方案可以使用队列作为锁来使用额外的同步:

while (!queue.isEmpty() || !zip.isClosed()) {
  synchronized (queue) {
    if(!queue.isEmpty()) {
      String line = queue.take();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

通过这种方式可以确保检查队列是否为空,并在每个线程没有一起执行的情况下取一个元素,这样没有线程会尝试弹出一个空队列。你必须决定你喜欢哪种方法。第二个需要额外的同步,但不会强制您的线程弹出空字符串。