Spring中如何实现同步作业队列?

use*_*918 1 java queue spring spring-boot

我正在尝试找出如何使用 Spring 实现作业队列。

我已经启动并运行了一台服务器,我计划让用户向其提交 POST 请求。这将接收一些数据,然后将作业排队以处理这些数据。

处理这些数据是一个昂贵的过程,有时可能需要 5 到 20 分钟(取决于需要完成的工作量)。因此,它需要同步运行。即一项工作完成,然后下一项工作可以开始。

例如

  • 用户提交作业A
  • 由于队列为空,作业 A 已启动
  • 另一个用户提交了第二个作业,作业 B
  • 作业 A 仍在运行,因此作业 B 被放入队列中
  • 另一个用户提交了作业 C,作业 A 仍在运行,因此它与作业 B 一起放入队列中。

我最近才开始学习 Spring,所以我正在寻找一些关于如何实现这一目标的想法。

我的想法是建立一个工厂类来接收然后可以安排的工作。

我的终点之一如下所示:

@RequestMapping(value = "/submitjob", method = RequestMethod.POST)
    public void queueJob(
            @RequestPart("file") MultipartFile file
    ) {

        if (file != null) {
           // queue job
        }
        // else return bad response.
    }
Run Code Online (Sandbox Code Playgroud)

任何意见是极大的赞赏。

Sta*_*ski 6

您可以使用java.util.concurrent.ExecutorService单个线程来实现此行为。

注意:此实现可以轻松发展为多线程服务,以便您可以并行运行处理

您必须面对的第一个问题是您不想阻止客户的请求。

如果您MultipartFile直接传递给服务,则它将必须等到文件被处理,这可能会导致超时,因为输入流位于请求中。

首先,您必须复制分段文件才能上传。在你的控制器中:

private final FileProcessingService fileProcessingService;

public StackOverFlowController(FileProcessingService fileProcessingService) {
    this.fileProcessingService = fileProcessingService;
}

@PostMapping(value = "/submitjob")
public void queueJob(@RequestPart("file") MultipartFile multipartFile) throws IOException, ExecutionException, InterruptedException {

    File tempFile = copyInputStreamToTempFile(multipartFile);

    fileProcessingService.queueFile(tempFile);

}

private File copyInputStreamToTempFile(MultipartFile multipartFile) throws IOException {
    File tempFile = File.createTempFile("queued-file-", ".tmp");
    try (OutputStream os = new FileOutputStream(tempFile)) {
        IOUtils.copy(multipartFile.getInputStream(), os);
    }
    return tempFile;
}

        
Run Code Online (Sandbox Code Playgroud)

这里MultipartFile复制到临时文件,但您可以将其保存在目录中, 然后将文件传递给 FileProcessingService,该服务必须是非阻塞的

然后,要创建一个将按顺序处理文件的非阻塞队列,您可以使用单线程ExecutorService. 调用execute会将任务添加到队列中。该方法接受参数的实现Runnable

服务框架可能如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
@Slf4j
public class FileProcessingService {

    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void queueFile(File fileToProcess) {
        executor.execute(new FileProcessRunnable(fileToProcess));
        log.info("Queued file " + fileToProcess);
    }
}
Run Code Online (Sandbox Code Playgroud)

一个简单的 Runnable 存根,用于Thread.sleep模拟处理:

@Slf4j
public class FileProcessRunnable implements Runnable {

    private final File fileToProcess;

    public FileProcessRunnable(File fileToProcess) {
        this.fileToProcess = fileToProcess;
    }

    @Override
    public void run() {
        process();
        log.info("Processed file " + fileToProcess.getName());
    }

    private void process() {
        try {
            Thread.sleep(1000); //simulating process
        } catch (InterruptedException e) {
            log.error("Error during process", e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

模拟行为的不太真实的测试:

@Test
@SneakyThrows
void should_queue_file_processing() {
    FileProcessingService fileProcessingService = new FileProcessingService();

    File file1 = File.createTempFile("temp-", ".tmp");
    File file2 = File.createTempFile("temp-", ".tmp");
    File file3 = File.createTempFile("temp-", ".tmp");
    File file4 = File.createTempFile("temp-", ".tmp");

    fileProcessingService.queueFile(file1);
    fileProcessingService.queueFile(file2);
    fileProcessingService.queueFile(file3);
    fileProcessingService.queueFile(file4);

    Thread.sleep(1000 * 5);//await until tasks are completed
}
Run Code Online (Sandbox Code Playgroud)

上面的测试将记录: 在此输入图像描述

如您所见,文件在处理之前已排队

有关执行器的更多信息,请参阅:https://www.baeldung.com/java-executor-service-tutorial

  • 如果实例化多次,它将创建多个线程,因为 FileProcessingService 负责创建 ExecutorService (2认同)
  • spring的@Component确保你的bean是单例的 (2认同)