Java公平可重入锁在多次迭代中不公平

And*_*nov 6 java reentrantlock

我正在尝试编写一个测试,在其中我想演示公平和不公平可重入锁之间的区别。该测试使用ThreadPoolExecutor并由多个迭代组成,每个迭代都有以下步骤:

  1. 创建一个公平锁来测试,并使用 1 个许可的信号量来管理释放锁的时间。
  2. 获取信号量。
  3. 提交一个任务,该任务获取锁,并等待信号量释放。
  4. 提交多个“枚举”任务,每个任务都尝试获取锁,然后更新共享AtomicInteger状态。
  5. 释放信号量并等待所有任务完成。

因此,对于公平锁,共享状态的最终值必须等于最后一个任务的索引。但测试在所有执行中约有 50% 失败。

我的代码如下所示:

    @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        int iterationsCount = 100;
        int waitingThreadsCount = 5;

        ReentrantLock lock = new ReentrantLock(true);
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i < iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future<?> lockingFuture = executor.submit(() -> {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .sequential()
                    .mapToObj(j -> executor.submit(() -> {
                        try {
                            lock.lock();
                            System.out.println("Acquiring lock for j=" + j);
                            return sharedState.updateAndGet((k) -> j);
                        } finally {
                            lock.unlock();
                        }
                    }))
                    .toList();
            unlockingSemaphore.release();
            lockingFuture.get();
            futures.forEach(f -> {
                try {
                    f.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            executor.shutdown();
            System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        Assertions.assertThat(wasAnyThreadUnfair).isFalse();
    }
Run Code Online (Sandbox Code Playgroud)

问题

这个测试有什么问题吗?我可以修复什么以使测试在 100% 的执行中通过?

And*_*nov 3

问题在于任务获取锁的顺序。提交顺序不保证与启动顺序相同​​。所以我添加了库的使用Awaitility来等待每个任务被锁获取阻塞。不幸的是,与非公平锁相关的测试不再通过,但这是另一个问题。

   @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(true), 100, 5)).isFalse();
    }

    @Test
    // TODO: doesn't pass
    void should_be_unfair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(false), 100, 5)).isTrue();
    }

    private boolean wasAnyThreadUnfair(ReentrantLock lock, int iterationsCount, int waitingThreadsCount)
            throws InterruptedException {
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i < iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future<?> lockingFuture = executor.submit(() -> {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .mapToObj(j -> {
                        Future<Integer> submitted = executor.submit(() -> {
                            try {
                                lock.lock();
                                System.out.println("Acquiring lock for j=" + j);
                                return sharedState.updateAndGet((k) -> j);
                            } finally {
                                lock.unlock();
                            }
                        });
                        await().atMost(150, TimeUnit.MILLISECONDS).until(() -> lock.getQueueLength() == j);
                        return submitted;
                    })
                    .toList();
            unlockingSemaphore.release();
            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.SECONDS);
            System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        return wasAnyThreadUnfair;
    }
Run Code Online (Sandbox Code Playgroud)

  • 关于您对不公平锁的评论:带有 fair=false 的锁并不能保证不公平,只是不需要公平(即使 fair=true 也不能 100% 保证公平性)。 (2认同)