mar*_*hon 8 java concurrency java.util.concurrent
我希望懒洋洋地创建一些东西并将结果缓存为优化.下面的代码是安全有效的,还是有更好的方法来做到这一点?这里需要比较和设置循环吗?
...
AtomicReference<V> fCachedValue = new AtomicReference<>();
public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        fCachedValue.set(result);
    }
    return result; 
} 
Run Code Online (Sandbox Code Playgroud)
编辑:我的示例中设置的值来自expensiveIdempotentOperation(),无论什么线程调用它,它总是相同的.
这不是一个伟大的系统.问题是两个线程可能会找到result == null,并且两者都将设置fCachedValue为新的结果值.
您想使用compareAndSet(...)方法:
AtomicReference<V> fCachedValue = new AtomicReference<>();
public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        if (!fCachedValue.compareAndSet(null, result)) {
            return fCachedValue.get();
        }
    }
    return result; 
} 
Run Code Online (Sandbox Code Playgroud)
如果多个线程在初始化之前进入该方法,则它们都可能尝试创建大型结果实例.他们都将创建自己的版本,但完成该过程的第一个将是将结果存储在AtomicReference中的人.其他线程将完成他们的工作,然后处置他们result,而是使用result由'winner'创建的实例.
尝试AtomicInitializer或AtomicSafeInitializer:
class CachedValue extends AtomicInitializer<V> {
  @Override
  public V initialize() {
    return costlyIdempotentOperation();
  }
}
Run Code Online (Sandbox Code Playgroud)
        这扩展了@TwoThe关于如何使用的答案。AtomicReference<Future<V>>
基本上,如果您不介意在代码中包含(稍微昂贵一点)synchronized部分,最简单(也是最易读)的解决方案是使用双重检查锁定习惯用法(使用volatile)。
如果您仍然想使用CAS(这就是整个类型系列的Atomic*目的),则必须使用AtomicReference<Future<V>>, not AtomicReference<V>(否则您最终可能会使用多个线程计算相同的昂贵值)。
但这里有另一个问题:您可能会获得一个有效的Future<V>实例并在多个线程之间共享它,但该实例本身可能无法使用,因为您昂贵的计算可能会失败。这导致我们需要fCachedValue.set(null)在某些或所有异常情况下重新设置我们拥有的原子引用 ()。
上面的内容意味着调用一次已经不够了fCachedValue.compareAndSet(null, new FutureTask(...))——您必须自动测试引用是否包含非null值,并在必要时重新初始化它(在每次调用时)。幸运的是,该类AtomicReference具有仅在循环中getAndUpdate(...)调用的方法。compareAndSet(...)所以生成的代码可能如下所示:
class ConcurrentLazy<V> implements Callable<V> {
    private final AtomicReference<Future<V>> fCachedValue = new AtomicReference<>();
    private final Callable<V> callable;
    public ConcurrentLazy(final Callable<V> callable) {
        this.callable = callable;
    }
    /**
     * {@inheritDoc}
     *
     * @throws Error if thrown by the underlying callable task.
     * @throws RuntimeException if thrown by the underlying callable task,
     *         or the task throws a checked exception,
     *         or the task is interrupted (in this last case, it's the
     *         client's responsibility to process the cause of the
     *         exception).
     * @see Callable#call()
     */
    @Override
    public V call() {
        final RunnableFuture<V> newTask = new FutureTask<>(this.callable);
        final Future<V> oldTask = this.fCachedValue.getAndUpdate(f -> {
            /*
             * If the atomic reference is un-initialised or reset,
             * set it to the new task. Otherwise, return the
             * previous (running or completed) task.
             */
            return f == null ? newTask : f;
        });
        if (oldTask == null) {
            /*
             * Compute the new value on the current thread. 
             */
            newTask.run();
        }
        try {
            return (oldTask == null ? newTask : oldTask).get();
        } catch (final ExecutionException ee) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);
            final Throwable cause = ee.getCause();
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw toUnchecked(cause);
        } catch (final InterruptedException ie) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);
            /*
             * It's the client's responsibility to check the cause.
             */
            throw new RuntimeException(ie);
        }
    }
    private static RuntimeException toUnchecked(final Throwable t) {
        return t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
    }
}
Run Code Online (Sandbox Code Playgroud)
在Kotlin中,上面的内容可以用更简单的方式表达(() -> V表示您的惰性计算):
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.atomic.AtomicReference
fun <V> (() -> V).concurrent(): () -> V {
  /*
   * The cached result of the computation.
   */
  val valueRef = AtomicReference<Future<V>?>()
  return {
    val newTask = FutureTask(this)
    val oldTaskOrNull = valueRef.getAndUpdate { oldTaskOrNull ->
      oldTaskOrNull ?: newTask
    }
    if (oldTaskOrNull == null) {
      /*
       * Compute the new value on the current thread.
       */
      newTask.run()
    }
    try {
      (oldTaskOrNull ?: newTask).get()
    }
    catch (ee: ExecutionException) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)
      /*
       * Don't mask the compilation failure with an ExecutionException.
       */
      throw ee.cause ?: ee
    }
    catch (e: Exception) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)
      /*
       * Most probably, an InterruptedException.
       */
      throw e
    }
  }
}
Run Code Online (Sandbox Code Playgroud)
使用示例:
  val lambda = {
    println("Computing 2x2...")
    val timeNanos = System.nanoTime()
    if (timeNanos % 2L == 0L) {
      throw IOException(timeNanos.toString())
    }
    2 * 2
  }.concurrent()
  val resultSeq = sequence {
    while (true) {
      val element = try {
        lambda().toString()
      }
      catch (ioe: IOException) {
        ioe.toString()
      }
      yield(element)
    }
  }
  resultSeq.take(50).forEach(::println)
Run Code Online (Sandbox Code Playgroud)
上面的代码运行时会产生以下输出:
Computing 2x2...
java.io.IOException: 93224642168398
Computing 2x2...
4
4
4
4
4
4
...
Run Code Online (Sandbox Code Playgroud)
计算结果可以多次重置,但一旦达到成功结果,总是会返回。同时,当计算正在运行时,lambda可以在多个线程之间共享(例如:缓存在 a 中ConcurrentHashMap)。
PS如果您仅限于Java,您可能还想看看该类CompletableFuture。
|   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           6357 次  |  
        
|   最近记录:  |