等待异步任务的安全有效方法

mku*_*ski 16 java concurrency multithreading

在系统中,我有一个对象-称为它TaskProcessor。它保存任务队列,这些任务由一些线程池(ExecutorService+ PriorityBlockingQueue)执行。每个任务的结果都保存在数据库中的某个唯一标识符下。

知道此唯一标识符的用户可以检查此任务的结果。结果可能在数据库中,但是任务仍然可以在队列中等待执行。在这种情况下,UserThread应等待任务完成。

此外,以下假设是有效的:

  • 如果其他人知道唯一标识符,则其他人可以将任务加入队列TaskProcessor而某些随机人UserThread可以访问结果。

  • UserThreadTaskProcess在同一个应用中。TaskProcessor包含线程池,并且UserThread只是servlet线程。

  • UserThread要求结果时应将其阻止,并且结果尚未完成。UserThreadTaskProcessor按唯一标识符分组的一项或多项完整任务完成后,应立即取消阻止

我的第一次尝试(幼稚的尝试)是在循环中检查结果并休眠一段时间:

// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
  sleep(someTime)
Run Code Online (Sandbox Code Playgroud)

但是我不喜欢。首先,我在浪费数据库连接。此外,如果任务在睡眠后立即完成,那么即使结果刚刚出现,用户也将等待。

下次尝试基于等待/通知:

//UserThread 
while (!checkResultIsInDatabase())
  taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()
Run Code Online (Sandbox Code Playgroud)

但是我也不喜欢。如果UserThreads要使用更多功能TaskProcessor,那么每次完成某些任务时,它们都会被不必要地唤醒,而且它们还会进行不必要的数据库调用。

最后一次尝试是基于我所说的waitingRoom

//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
  mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
  getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
Run Code Online (Sandbox Code Playgroud)

但这似乎并不安全。在数据库check和之间wait(),任务可以完成(notify()由于UserThread尚未调用wait()而不会生效),最终可能会导致死锁。

看来,我应该在某个地方同步它。但是,恐怕它不会有效。有没有办法纠正我的任何尝试,使它们安全有效?也许还有其他更好的方法可以做到这一点?

Cos*_*atu 10

您似乎正在寻找某种未来 / 承诺抽象。看一下CompletableFuture,从Java 8开始可用。

CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
        .exceptionally(ex -> logger.error("err", ex));

// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
Run Code Online (Sandbox Code Playgroud)

您还可以从异步操作中返回一个(有意义的)值,并将结果传递给回调。为了充分利用这一点,来看看supplyAsync()thenAccept()thenApply()whenComplete()等。

您还可以将多个期货组合成一个或更多。


Nik*_*ski 3

我相信mutexCountDownLatchinwaitingRoom方法替换 of 可以防止僵局。

CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
  // consider timed version
  latch.await()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
  getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
Run Code Online (Sandbox Code Playgroud)