And*_*nov 6 java reentrantlock
我正在尝试编写一个测试,在其中我想演示公平和不公平可重入锁之间的区别。该测试使用ThreadPoolExecutor并由多个迭代组成,每个迭代都有以下步骤:
AtomicInteger状态。因此,对于公平锁,共享状态的最终值必须等于最后一个任务的索引。但测试在所有执行中约有 50% 失败。
我的代码如下所示:
@Test
void should_be_fair() throws InterruptedException, ExecutionException {
int iterationsCount = 100;
int waitingThreadsCount = 5;
ReentrantLock lock = new ReentrantLock(true);
Semaphore unlockingSemaphore = new Semaphore(1);
boolean wasAnyThreadUnfair = false;
for (int i = 0; i < iterationsCount; i++) {
unlockingSemaphore.acquire();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
Future<?> lockingFuture = executor.submit(() -> {
try {
lock.lock();
unlockingSemaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unlockingSemaphore.release();
lock.unlock();
}
});
AtomicInteger sharedState = new AtomicInteger();
List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
.sequential()
.mapToObj(j -> executor.submit(() -> {
try {
lock.lock();
System.out.println("Acquiring lock for j=" + j);
return sharedState.updateAndGet((k) -> j);
} finally {
lock.unlock();
}
}))
.toList();
unlockingSemaphore.release();
lockingFuture.get();
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
executor.shutdown();
System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
if (sharedState.get() != waitingThreadsCount) {
wasAnyThreadUnfair = true;
break;
}
}
Assertions.assertThat(wasAnyThreadUnfair).isFalse();
}
Run Code Online (Sandbox Code Playgroud)
问题:
这个测试有什么问题吗?我可以修复什么以使测试在 100% 的执行中通过?
问题在于任务获取锁的顺序。提交顺序不保证与启动顺序相同。所以我添加了库的使用Awaitility来等待每个任务被锁获取阻塞。不幸的是,与非公平锁相关的测试不再通过,但这是另一个问题。
@Test
void should_be_fair() throws InterruptedException, ExecutionException {
Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(true), 100, 5)).isFalse();
}
@Test
// TODO: doesn't pass
void should_be_unfair() throws InterruptedException, ExecutionException {
Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(false), 100, 5)).isTrue();
}
private boolean wasAnyThreadUnfair(ReentrantLock lock, int iterationsCount, int waitingThreadsCount)
throws InterruptedException {
Semaphore unlockingSemaphore = new Semaphore(1);
boolean wasAnyThreadUnfair = false;
for (int i = 0; i < iterationsCount; i++) {
unlockingSemaphore.acquire();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
Future<?> lockingFuture = executor.submit(() -> {
try {
lock.lock();
unlockingSemaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unlockingSemaphore.release();
lock.unlock();
}
});
AtomicInteger sharedState = new AtomicInteger();
List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
.mapToObj(j -> {
Future<Integer> submitted = executor.submit(() -> {
try {
lock.lock();
System.out.println("Acquiring lock for j=" + j);
return sharedState.updateAndGet((k) -> j);
} finally {
lock.unlock();
}
});
await().atMost(150, TimeUnit.MILLISECONDS).until(() -> lock.getQueueLength() == j);
return submitted;
})
.toList();
unlockingSemaphore.release();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
if (sharedState.get() != waitingThreadsCount) {
wasAnyThreadUnfair = true;
break;
}
}
return wasAnyThreadUnfair;
}
Run Code Online (Sandbox Code Playgroud)