Java ExecutorService,newVirtualThreadPerTaskExecutor:线程在从扫描器获取输入之前退出

par*_*cer 0 java multithreading

我有这个简单的代码,用于在第一个线程中从用户那里获取名称,将其添加到队列中,并使用第二个线程打印名称:

import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class TestThreads {
    private static final Scanner scanner = new Scanner(System.in);
    private static ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private static LinkedBlockingQueue<String> names = new LinkedBlockingQueue<>();
    
    public static void main(String[] args) {
        executor.submit(() -> {
            try {
                while (true) {
                    String name = getName();
                    names.add(name);
                }
            } catch (Exception e)  {
                e.printStackTrace();
            }
        });

        executor.submit(() -> {
            try {
                while (true) {
                    if (!names.isEmpty())  {
                        String name = names.poll();
                        System.out.println("\n**** The name is " + name + " ****\n");
                    }
                }
            } catch (Exception e)  {
                e.printStackTrace();
            }
        });
    }
    
    private static String getName()  {
        System.out.print("Enter a name: ");
        return scanner.nextLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

然而输出是

Enter a name: 
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

尽管存在无限循环,但程序在用户有机会输入任何名称之前就退出了while。我究竟做错了什么?

dan*_*1st 8

一旦所有非守护线程终止, Java 程序就会结束。

虚拟线程始终是守护线程,因此它们不会阻止程序退出

您可以做的就是阻塞主线程,直到Executor完成所有任务。您可以使用 try-with-resources 来做到这一点:

try(ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()){
    executor.submit(() -> {
        try {
            while (true) {
                String name = getName();
                names.add(name);
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });

    executor.submit(() -> {
        try {
            while (true) {
                if (!names.isEmpty())  {
                    String name = names.poll();
                    System.out.println("\n**** The name is " + name + " ****\n");
                }
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

但是,假设您没有从虚拟线程提交任何其他任务,因为close()调用时您无法提交任何新任务。

get或者,您可以通过调用调用返回的两个Futures来等待 future 完成submit

public static void main(String[] args) throws InterruptedException {
    Future<?> firstTask = executor.submit(() -> {
        try {
            while (true) {
                String name = getName();
                names.add(name);
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });

    Future<?> secondTask = executor.submit(() -> {
        try {
            while (true) {
                if (!names.isEmpty())  {
                    String name = names.poll();
                    System.out.println("\n**** The name is " + name + " ****\n");
                }
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });
    firstTask.get();
    secondTask.get();
}
Run Code Online (Sandbox Code Playgroud)

解决该问题的另一种方法是结构化并发。这样,您可以创建子任务,如下所示:

try(var scope = new StructuredTaskScope.ShutdownOnFailure()){
    scope.fork(() -> {
        try {
            while (true) {
                String name = getName();
                names.add(name);
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });

    scope.fork(() -> {
        try {
            while (true) {
                if (!names.isEmpty())  {
                    String name = names.poll();
                    System.out.println("\n**** The name is " + name + " ****\n");
                }
            }
        } catch (Exception e)  {
            e.printStackTrace();
        }
    });
    scope.join();//wait for all tasks to conplete
    scope.throwIfFailed();//throw an exception if any subtask threw an exception 
}
Run Code Online (Sandbox Code Playgroud)

请注意,在撰写本文时,此 API 目前处于预览状态。