如何在 Spring Boot 中最好地实现多线程方法

gtl*_*wig 0 java multithreading spring-boot

我一直在努力为我正在开发的应用程序实现多线程方法。

我想要在并行线程中运行的部分最初是用一个关于列表的 for 循环构建的。

@Service
public ApplicationServiceImpl implements ApplicationService {

    @Override
    public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
        for (MyObject myObject : myObjectList) {
            AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
            YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
            runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
            runMethodD(yetAnotherTypeOfObject);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

方法private AnotherTypeOfObject runMethodA(MyObject myObject) {...}private YetAnotherTypeOfObject yetAnotherTypeOfObject(AnotherTypeOfObject anotherTypeOfObject) {...}private void runMethodC(YetAnotherTypeOfObject yetAnotherTypeOfObject, String aStringValue, String anotherStringValue) {...}private void runMethodD(MyObject myObject) {...}仅使用局部变量。

我花了很多时间寻找一种解决方案,该解决方案允许触发 100 个线程列表,MyObject而不是一个接一个。

我所做的是创建一个:

@Configuration
@EnableAsync
public class AsyncConfiguration() {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor aSyncExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setQueueCapacity(50);
        threadPoolTaskExecutor.setThreadNamePrefix("threadNamePrefix");
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}
Run Code Online (Sandbox Code Playgroud)

我确实通过方法 A、B、C 和 D 进行了大量操作log.info("some recognizable text"),因此我可以确定发生了什么,并将这些方法聚合为一个类似的方法

private void runThreads(MyObject myObject, String aStringValue, String anotherStringValue) {
    AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
    YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
    runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
    runMethodD(yetAnotherTypeOfObject);
}
Run Code Online (Sandbox Code Playgroud)

我尝试将主要方法运行为:

@Override
@Async("threadPoolTaskExecutor")
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
    String aStringValue = myObject.getAStringValue();
    String anotherStringValue = myObject.getAnotherStringValue();
    myObjectList.forEach(myObject -> runThreads(myObject, aStringValue, anotherStringValue));
}
Run Code Online (Sandbox Code Playgroud)

我仍然没有得到为该runThreads(...) {}方法触发几个线程的预期结果,因此处理是并行完成的。

我在这里缺少什么?

Gee*_*nte 5

如果只是为了并行运行集合的所有元素,那么您可以使用 Stream.parallel()。它使用默认的 ForkJoinPool,每个 CPU 核心都有一个线程。这是Java 8中引入的最简单的方法。

myObjectList.stream()
        .parallel()
        .forEach(myObject -> runThreads(myObject, myObject.getAStringValue(),  myObject.getAnotherStringValue()));
Run Code Online (Sandbox Code Playgroud)

为此,您不需要任何 @Async 或 Spring 提供的执行器。

您可以使用自定义 ForkJoinPool 来自定义线程数,但默认值也可能效果很好。

ForkJoinPool customThreadPool = new ForkJoinPool(4);
customThreadPool.invoke(
        () -> myObjectList.stream()
                .parallel()
                .forEach(myObject -> runThreads(myObject, myObject.getAStringValue(),  myObject.getAnotherStringValue())));
Run Code Online (Sandbox Code Playgroud)