Java - 线程卡在“Park”状态

Jac*_*ble 1 java multithreading

我无法同时运行超过 100 个线程。当我进行线程转储时,我注意到其中许多都在parked status,即

停车等待 <0x00000000827e1760> (java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)。

该程序在大约 25 个或更少的线程下运行良好。有没有办法确定导致并发锁的原因,和/或防止它?它使用 Executor 服务在大小为 200 的固定池中运行。

对于缺少代码表示歉意 - 它是专有的,并且有很多需要更改来混淆它。

Dul*_*ren 6

您是否使用某种ThreadPoolExecutor,例如java.util.concurrent.Executors类提供的线程池执行器?也许您正面临着任务通过未捕获的静默异常来完成的情况。转储片段看起来像一个不活动的池线程,而获得不活动线程(应该是活动的)的原因之一是抛出异常,但被默认线程池实现包围。

LockSupport.park()

在线程池中,等待任务的线程被锁定LockSupport.park();。请参阅java.util.concurrent.locks.AbstractQueuedSynchronizer openjdk 的源代码

public final void await() throws InterruptedException {
    // code omitted
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // code omitted
}
Run Code Online (Sandbox Code Playgroud)

这意味着线程正在执行的任务已完成(是否突然),现在线程正在等待另一个任务执行(请参阅java.util.concurrent.ThreadPoolExecutor openjdk 源代码):

private Runnable getTask() {
    // ...
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();  <== the thread is blocked here  
    // ...
}
Run Code Online (Sandbox Code Playgroud)

正如我们所看到的,线程在调用中被锁定workQueue.take();

因此,很快,处于“停放状态”的线程只是在先前的任务完成后等待新的任务。

为什么我的任务不再运行?

完成任务的最合理原因是正常结束run()。任务流完成,然后任务由各自的所有者线程释放。一旦线程释放了任务,只要有一个任务,它就准备好执行另一个任务。

检查这种情况的一种直接方法是在方法末尾记录一些内容run()

class MyRunnable implements Runnable {

    public void run() {
        while(/*some condition*/) {
           // do my things
        }
        log.info("My Runnable has finished for now!");
    }
}
Run Code Online (Sandbox Code Playgroud)

如果记录消息还不够,您可以调用另一个对象的方法。

木头下的例外

另一个(最)可能的原因是任务执行期间抛出未捕获的异常。在线程池中,像这样的未经检查的异常将突然停止方法执行并(令人惊讶地)被吞入对象中java.util.concurrent.FutureTask。为了避免这样的事情,我使用以下习惯用法:

class MyRunnable implements Runnable {
    public void run() {
        while(/*some condition*/) {
            try {
                // do my things
            } catch (Throwable throwable) {
                handle(throwable);
            }
        }
        log.info("My Runnable has finished for now!");
    }

    private void handle(Throwable throwable) {
        // ...
    }
}
Run Code Online (Sandbox Code Playgroud)

或者根据我也使用的逻辑/性能要求:

    public void run() {
        try {
            while(/*some condition*/) {
                // do my things
            }
        } catch (Throwable throwable) {
            handle(throwable);
        }
        System.out.println("My Runnable has finished for now!");
    }
Run Code Online (Sandbox Code Playgroud)

下面的代码举例说明了此处评论的问题的实际操作:

package mypocs;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExceptionSwallowingInThreadPoolsPoC {

  public static void main(String[] args) {

    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

    final Object LOCK = new Object();

    threadPoolExecutor.submit(() -> {
      while (true) {
        synchronized (LOCK) {
          System.out.println("Thread 'A' never ends");
        }
        Thread.sleep(1000L);
      }
    });

    threadPoolExecutor.submit(() -> {
      int lifespan = 3;
      while (lifespan > 0) {
    synchronized (LOCK) {
              System.out.println("Thread 'B' is living for " + lifespan + " seconds");
        }
        lifespan--;
        try {
          Thread.sleep(1000L);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      System.out.println("Thread 'B' finished");
    });

    threadPoolExecutor.submit(() -> {
      int lifespan = 3;
      while (lifespan > 0) {
        synchronized (LOCK) {
          System.out.println("Thread 'C' is living for " + lifespan + " seconds");
        }
        lifespan--;

        if (lifespan < 1) {
          throw new RuntimeException("lifespan reached zero");
        }

        try {
          Thread.sleep(1000L);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      System.out.println("Thread 'C' finished");
    });

    while (true) {
      try {
        Thread.sleep(1000L);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      synchronized (LOCK) {
        System.out.println("==== begin");
        System.out.println("getActiveCount: " + threadPoolExecutor.getActiveCount());
        System.out.println("getCompletedTaskCount: " + threadPoolExecutor.getCompletedTaskCount());
        System.out.println("getPoolSize: " + threadPoolExecutor.getPoolSize());
        System.out.println("==== end");
      }
    }

  }

}
Run Code Online (Sandbox Code Playgroud)

该代码应该输出类似以下内容:

Thread 'A' never ends
Thread 'B' is living for 3 seconds
Thread 'C' is living for 3 seconds
Thread 'C' is living for 2 seconds
==== begin
getActiveCount: 3
getCompletedTaskCount: 0
getPoolSize: 3
==== end
Thread 'B' is living for 2 seconds
Thread 'A' never ends
==== begin
getActiveCount: 3
getCompletedTaskCount: 0
getPoolSize: 3
==== end
Thread 'C' is living for 1 seconds
Thread 'B' is living for 1 seconds
Thread 'A' never ends
Thread 'B' finished
==== begin
getActiveCount: 1
getCompletedTaskCount: 2
getPoolSize: 3
==== end
Thread 'A' never ends
Thread 'A' never ends
...
Run Code Online (Sandbox Code Playgroud)