如何以安全有效的方式使用AtomicReference进行延迟创建和设置?

mar*_*hon 8 java concurrency java.util.concurrent

我希望懒洋洋地创建一些东西并将结果缓存为优化.下面的代码是安全有效的,还是有更好的方法来做到这一点?这里需要比较和设置循环吗?

...
AtomicReference<V> fCachedValue = new AtomicReference<>();

public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        fCachedValue.set(result);
    }
    return result; 
} 
Run Code Online (Sandbox Code Playgroud)

编辑:我的示例中设置的值来自expensiveIdempotentOperation(),无论什么线程调用它,它总是相同的.

rol*_*lfl 9

这不是一个伟大的系统.问题是两个线程可能会找到result == null,并且两者都将设置fCachedValue为新的结果值.

您想使用compareAndSet(...)方法:

AtomicReference<V> fCachedValue = new AtomicReference<>();

public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        if (!fCachedValue.compareAndSet(null, result)) {
            return fCachedValue.get();
        }
    }
    return result; 
} 
Run Code Online (Sandbox Code Playgroud)

如果多个线程在初始化之前进入该方法,则它们都可能尝试创建大型结果实例.他们都将创建自己的版本,但完成该过程的第一个将是将结果存储在AtomicReference中的人.其他线程将完成他们的工作,然后处置他们result,而是使用result由'winner'创建的实例.


Noe*_*Yap 6

尝试AtomicInitializerAtomicSafeInitializer

class CachedValue extends AtomicInitializer<V> {
  @Override
  public V initialize() {
    return costlyIdempotentOperation();
  }
}
Run Code Online (Sandbox Code Playgroud)


Bas*_*ass 5

这扩展了@TwoThe关于如何使用的答案。AtomicReference<Future<V>>

基本上,如果您不介意在代码中包含(稍微昂贵一点)synchronized部分,最简单(也是最易读)的解决方案是使用双重检查锁定习惯用法(使用volatile)。

如果您仍然想使用CAS(这就是整个类型系列的Atomic*目的),则必须使用AtomicReference<Future<V>>, not AtomicReference<V>(否则您最终可能会使用多个线程计算相同的昂贵值)。

但这里有另一个问题:您可能会获得一个有效的Future<V>实例并在多个线程之间共享它,但该实例本身可能无法使用,因为您昂贵的计算可能会失败。这导致我们需要fCachedValue.set(null)在某些或所有异常情况下重新设置我们拥有的原子引用 ()。

上面的内容意味着调用一次已经不够了fCachedValue.compareAndSet(null, new FutureTask(...))——您必须自动测试引用是否包含非null值,并在必要时重新初始化它(在每次调用时)。幸运的是,该类AtomicReference具有仅在循环中getAndUpdate(...)调用的方法。compareAndSet(...)所以生成的代码可能如下所示:

class ConcurrentLazy<V> implements Callable<V> {
    private final AtomicReference<Future<V>> fCachedValue = new AtomicReference<>();

    private final Callable<V> callable;

    public ConcurrentLazy(final Callable<V> callable) {
        this.callable = callable;
    }

    /**
     * {@inheritDoc}
     *
     * @throws Error if thrown by the underlying callable task.
     * @throws RuntimeException if thrown by the underlying callable task,
     *         or the task throws a checked exception,
     *         or the task is interrupted (in this last case, it's the
     *         client's responsibility to process the cause of the
     *         exception).
     * @see Callable#call()
     */
    @Override
    public V call() {
        final RunnableFuture<V> newTask = new FutureTask<>(this.callable);
        final Future<V> oldTask = this.fCachedValue.getAndUpdate(f -> {
            /*
             * If the atomic reference is un-initialised or reset,
             * set it to the new task. Otherwise, return the
             * previous (running or completed) task.
             */
            return f == null ? newTask : f;
        });

        if (oldTask == null) {
            /*
             * Compute the new value on the current thread. 
             */
            newTask.run();
        }

        try {
            return (oldTask == null ? newTask : oldTask).get();
        } catch (final ExecutionException ee) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);

            final Throwable cause = ee.getCause();
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw toUnchecked(cause);
        } catch (final InterruptedException ie) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);

            /*
             * It's the client's responsibility to check the cause.
             */
            throw new RuntimeException(ie);
        }
    }

    private static RuntimeException toUnchecked(final Throwable t) {
        return t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
    }
}
Run Code Online (Sandbox Code Playgroud)

Kotlin中,上面的内容可以用更简单的方式表达(() -> V表示您的惰性计算):

import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.atomic.AtomicReference

fun <V> (() -> V).concurrent(): () -> V {
  /*
   * The cached result of the computation.
   */
  val valueRef = AtomicReference<Future<V>?>()

  return {
    val newTask = FutureTask(this)

    val oldTaskOrNull = valueRef.getAndUpdate { oldTaskOrNull ->
      oldTaskOrNull ?: newTask
    }

    if (oldTaskOrNull == null) {
      /*
       * Compute the new value on the current thread.
       */
      newTask.run()
    }

    try {
      (oldTaskOrNull ?: newTask).get()
    }
    catch (ee: ExecutionException) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)

      /*
       * Don't mask the compilation failure with an ExecutionException.
       */
      throw ee.cause ?: ee
    }
    catch (e: Exception) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)

      /*
       * Most probably, an InterruptedException.
       */
      throw e
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

使用示例:

  val lambda = {
    println("Computing 2x2...")

    val timeNanos = System.nanoTime()
    if (timeNanos % 2L == 0L) {
      throw IOException(timeNanos.toString())
    }

    2 * 2
  }.concurrent()

  val resultSeq = sequence {
    while (true) {
      val element = try {
        lambda().toString()
      }
      catch (ioe: IOException) {
        ioe.toString()
      }

      yield(element)
    }
  }

  resultSeq.take(50).forEach(::println)
Run Code Online (Sandbox Code Playgroud)

上面的代码运行时会产生以下输出:

Computing 2x2...
java.io.IOException: 93224642168398
Computing 2x2...
4
4
4
4
4
4
...
Run Code Online (Sandbox Code Playgroud)

计算结果可以多次重置,但一旦达到成功结果,总是会返回。同时,当计算正在运行时,lambda可以在多个线程之间共享(例如:缓存在 a 中ConcurrentHashMap)。

PS如果您仅限于Java,您可能还想看看该类CompletableFuture