mku*_*ski 16 java concurrency multithreading
在系统中,我有一个对象-称为它TaskProcessor
。它保存任务队列,这些任务由一些线程池(ExecutorService
+ PriorityBlockingQueue
)执行。每个任务的结果都保存在数据库中的某个唯一标识符下。
知道此唯一标识符的用户可以检查此任务的结果。结果可能在数据库中,但是任务仍然可以在队列中等待执行。在这种情况下,UserThread
应等待任务完成。
此外,以下假设是有效的:
如果其他人知道唯一标识符,则其他人可以将任务加入队列,TaskProcessor
而某些随机人UserThread
可以访问结果。
UserThread
并TaskProcess
在同一个应用中。TaskProcessor
包含线程池,并且UserThread
只是servlet线程。
UserThread
要求结果时应将其阻止,并且结果尚未完成。UserThread
在TaskProcessor
按唯一标识符分组的一项或多项完整任务完成后,应立即取消阻止
我的第一次尝试(幼稚的尝试)是在循环中检查结果并休眠一段时间:
// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)
Run Code Online (Sandbox Code Playgroud)
但是我不喜欢。首先,我在浪费数据库连接。此外,如果任务在睡眠后立即完成,那么即使结果刚刚出现,用户也将等待。
下次尝试基于等待/通知:
//UserThread
while (!checkResultIsInDatabase())
taskProcessor.wait()
//TaskProcessor
... some complicated calculations
this.notifyAll()
Run Code Online (Sandbox Code Playgroud)
但是我也不喜欢。如果UserThreads
要使用更多功能TaskProcessor
,那么每次完成某些任务时,它们都会被不必要地唤醒,而且它们还会进行不必要的数据库调用。
最后一次尝试是基于我所说的waitingRoom
:
//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
Run Code Online (Sandbox Code Playgroud)
但这似乎并不安全。在数据库check和之间wait()
,任务可以完成(notify()
由于UserThread
尚未调用wait()
而不会生效),最终可能会导致死锁。
看来,我应该在某个地方同步它。但是,恐怕它不会有效。有没有办法纠正我的任何尝试,使它们安全有效?也许还有其他更好的方法可以做到这一点?
Cos*_*atu 10
您似乎正在寻找某种未来 / 承诺抽象。看一下CompletableFuture,从Java 8开始可用。
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
Run Code Online (Sandbox Code Playgroud)
您还可以从异步操作中返回一个(有意义的)值,并将结果传递给回调。为了充分利用这一点,来看看supplyAsync()
,thenAccept()
,thenApply()
,whenComplete()
等。
您还可以将多个期货组合成一个或更多。
我相信mutex
用CountDownLatch
inwaitingRoom
方法替换 of 可以防止僵局。
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
Run Code Online (Sandbox Code Playgroud)