无锁的等待/通知变体

Mar*_*nik 12 java multithreading

Java要求线程o在调用之前拥有监视器o.wait()o.notify().这是一个众所周知的事实.但是,从根本上要求任何此类机制工作的互斥锁吗?如果提供了API,该怎么办?

compareAndWait
Run Code Online (Sandbox Code Playgroud)

setAndNotify
Run Code Online (Sandbox Code Playgroud)

相反,将CAS动作与线程调度/去调度相结合?这会有一些好处:

  • 即将进入等待状态的线程不会妨碍通知线程的进度;

  • 在被允许检查等待状况之前,他们也不必等待对方;

  • 在通知方面,任何数量的生产者线程都可以同时进行.

提供这样的API是否存在根本的,不可逾越的障碍?

Hol*_*ger 5

LockSupport.park()使用和实现任意等待/通知机制没有问题,LockSupport.unpark(Thread)因为这些基本原语不需要持有任何锁。

\n\n

在不持有锁的情况下, Object.wait/Object.notify或/都不向您提供这样的通知的原因是语义上的。通知的概念是,一个线程等待条件满足,而另一个线程在条件更改为满足状态时停止等待。如果不持有与该条件关联的锁,则无法保证在条件\xe2\x80\x99s 状态测试和线程\xe2\x80\x99s 状态更改之间该条件不会更改。Condition.awaitCondition.signal

\n\n

更具体地说,当一个改变条件的线程通知另一个线程时,有可能在通知发生之前条件已被再次修改。但更糟糕的是,在线程开始之前,条件可能会更改为 \xe2\x80\x9cfulfilled\xe2\x80\x9d ,在wait这种情况下,线程可能会错过通知并永远挂起。

\n\n

即使您能够将条件测试和等待操作融合为一个原子操作,它\xe2\x80\x99 也没有帮助。等待条件本身并不是目的。线程想要等待条件的原因是它想要执行以条件为先决条件的操作,因此在执行操作时不得更改。这就是要点:条件测试和操作必须作为持有锁的一个操作来实现,无论锁的概念是如何实现的。

\n\n

在某些特殊情况下,不会出现此类问题,例如,当已知条件\xe2\x80\x99s 状态转换受到限制时,因此您可以防止条件返回到未满足的状态。\xe2\x80\x99 正是CountDownLatchCyclicBarrier、等工具Phaser的用途,但具有等待/通知预定义语义的通知机制意味着不假设这种特殊情况。

\n


Old*_*eon 2

更多的是一个思想实验,而不是一些真正的工作代码,但这似乎有效。

// My lock class.
public static class Padlock<E extends Enum<E>> {

    // Using Markable because I think I'm going to need it.
    public final AtomicReference<E> value;
    // Perhaps use a set to maintain all waiters.
    Set<Thread> waiters = ConcurrentHashMap.newKeySet();

    public Padlock(E initialValue) {
        this.value = new AtomicReference<>(initialValue);
    }

    /**
     * Waits for the locks value to become the specified key value.
     *
     * @param waitFor - The desired key.
     */
    public void compareAndWait(E waitFor) {
        log("Wait for " + waitFor);
        // Spin on the value.
        while (value.get() != waitFor) {
            log("Park waiting for " + waitFor);
            // Remember me as waiting.
            waiters.add(Thread.currentThread());
            // TODO: What do we do here??
            LockSupport.park();
            log("Awoke " + waitFor);
        }
    }

    /**
     * Sets the locks value to the key value.
     *
     * If this resulted in a change - notify all changers.
     *
     * @param shouldBe - What it should be now.
     * @param makeIt - The new value to set.
     */
    public void setAndNotify(E shouldBe, E makeIt) {
        log("Set " + shouldBe + "->" + makeIt);
        if (value.compareAndSet(shouldBe, makeIt)) {
            log("Notify " + shouldBe + "->" + makeIt);
            // It changed! Notify the waiters.
            for (Thread t : waiters) {
                // Perhaps
                log("Unpark " + t.getName());
                LockSupport.unpark(t);
            }
        }
    }
}

enum State {

    Off, On;
}

private static final long TESTTIME = 30000;
private static final long TICK = 100;

private static final void log(String s) {
    System.out.println(Thread.currentThread().getName() + ": " + s);

}

static class MutexTester implements Runnable {

    final Padlock<State> lock;

    public MutexTester(Padlock<State> lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.getClass().getSimpleName());
        long wait = System.currentTimeMillis() + TESTTIME;
        do {
            // Wait for an On!
            lock.compareAndWait(Test.State.On);
            try {
                log("Got it!");
                try {
                    Thread.sleep(TICK);
                } catch (InterruptedException ex) {
                    log("Interrupted!");
                }
            } finally {
                // Release
                lock.setAndNotify(Test.State.On, Test.State.Off);
            }
        } while (System.currentTimeMillis() < wait);
        log("Done");
    }
}

static class RandomSwitcher implements Runnable {

    final Padlock<State> lock;
    final Random random = new Random();

    public RandomSwitcher(Padlock<State> lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.getClass().getSimpleName());
        long wait = System.currentTimeMillis() + TESTTIME;
        do {
            // On!
            lock.setAndNotify(Test.State.Off, Test.State.On);
            log("On!");
            pause();
            lock.setAndNotify(Test.State.On, Test.State.Off);
            log("Off!");
            pause();
        } while (System.currentTimeMillis() < wait);
        log("Done");
    }

    private void pause() {
        try {
            // Random wait.
            Thread.sleep(TICK * random.nextInt(10));
        } catch (InterruptedException ex) {
            System.out.println("Interrupted! " + Thread.currentThread().getName());
        }
    }
}

public void test() throws InterruptedException {
    final Padlock<State> lock = new Padlock<>(State.Off);
    Thread t1 = new Thread(new MutexTester(lock));
    t1.start();
    Thread t2 = new Thread(new RandomSwitcher(lock));
    t2.start();
    t1.join();
    t2.join();
}
Run Code Online (Sandbox Code Playgroud)

我已经实现了在释放互斥体时compareAndWait等待独占使用的协议。setAndNotify