mar*_*hon 8 java concurrency java.util.concurrent
我希望懒洋洋地创建一些东西并将结果缓存为优化.下面的代码是安全有效的,还是有更好的方法来做到这一点?这里需要比较和设置循环吗?
...
AtomicReference<V> fCachedValue = new AtomicReference<>();
public V getLazy() {
V result = fCachedValue.get();
if (result == null) {
result = costlyIdempotentOperation();
fCachedValue.set(result);
}
return result;
}
Run Code Online (Sandbox Code Playgroud)
编辑:我的示例中设置的值来自expensiveIdempotentOperation(),无论什么线程调用它,它总是相同的.
这不是一个伟大的系统.问题是两个线程可能会找到result == null,并且两者都将设置fCachedValue为新的结果值.
您想使用compareAndSet(...)方法:
AtomicReference<V> fCachedValue = new AtomicReference<>();
public V getLazy() {
V result = fCachedValue.get();
if (result == null) {
result = costlyIdempotentOperation();
if (!fCachedValue.compareAndSet(null, result)) {
return fCachedValue.get();
}
}
return result;
}
Run Code Online (Sandbox Code Playgroud)
如果多个线程在初始化之前进入该方法,则它们都可能尝试创建大型结果实例.他们都将创建自己的版本,但完成该过程的第一个将是将结果存储在AtomicReference中的人.其他线程将完成他们的工作,然后处置他们result,而是使用result由'winner'创建的实例.
尝试AtomicInitializer或AtomicSafeInitializer:
class CachedValue extends AtomicInitializer<V> {
@Override
public V initialize() {
return costlyIdempotentOperation();
}
}
Run Code Online (Sandbox Code Playgroud)
这扩展了@TwoThe关于如何使用的答案。AtomicReference<Future<V>>
基本上,如果您不介意在代码中包含(稍微昂贵一点)synchronized部分,最简单(也是最易读)的解决方案是使用双重检查锁定习惯用法(使用volatile)。
如果您仍然想使用CAS(这就是整个类型系列的Atomic*目的),则必须使用AtomicReference<Future<V>>, not AtomicReference<V>(否则您最终可能会使用多个线程计算相同的昂贵值)。
但这里有另一个问题:您可能会获得一个有效的Future<V>实例并在多个线程之间共享它,但该实例本身可能无法使用,因为您昂贵的计算可能会失败。这导致我们需要fCachedValue.set(null)在某些或所有异常情况下重新设置我们拥有的原子引用 ()。
上面的内容意味着调用一次已经不够了fCachedValue.compareAndSet(null, new FutureTask(...))——您必须自动测试引用是否包含非null值,并在必要时重新初始化它(在每次调用时)。幸运的是,该类AtomicReference具有仅在循环中getAndUpdate(...)调用的方法。compareAndSet(...)所以生成的代码可能如下所示:
class ConcurrentLazy<V> implements Callable<V> {
private final AtomicReference<Future<V>> fCachedValue = new AtomicReference<>();
private final Callable<V> callable;
public ConcurrentLazy(final Callable<V> callable) {
this.callable = callable;
}
/**
* {@inheritDoc}
*
* @throws Error if thrown by the underlying callable task.
* @throws RuntimeException if thrown by the underlying callable task,
* or the task throws a checked exception,
* or the task is interrupted (in this last case, it's the
* client's responsibility to process the cause of the
* exception).
* @see Callable#call()
*/
@Override
public V call() {
final RunnableFuture<V> newTask = new FutureTask<>(this.callable);
final Future<V> oldTask = this.fCachedValue.getAndUpdate(f -> {
/*
* If the atomic reference is un-initialised or reset,
* set it to the new task. Otherwise, return the
* previous (running or completed) task.
*/
return f == null ? newTask : f;
});
if (oldTask == null) {
/*
* Compute the new value on the current thread.
*/
newTask.run();
}
try {
return (oldTask == null ? newTask : oldTask).get();
} catch (final ExecutionException ee) {
/*
* Re-set the reference.
*/
this.fCachedValue.set(null);
final Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw (Error) cause;
}
throw toUnchecked(cause);
} catch (final InterruptedException ie) {
/*
* Re-set the reference.
*/
this.fCachedValue.set(null);
/*
* It's the client's responsibility to check the cause.
*/
throw new RuntimeException(ie);
}
}
private static RuntimeException toUnchecked(final Throwable t) {
return t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}
}
Run Code Online (Sandbox Code Playgroud)
在Kotlin中,上面的内容可以用更简单的方式表达(() -> V表示您的惰性计算):
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.atomic.AtomicReference
fun <V> (() -> V).concurrent(): () -> V {
/*
* The cached result of the computation.
*/
val valueRef = AtomicReference<Future<V>?>()
return {
val newTask = FutureTask(this)
val oldTaskOrNull = valueRef.getAndUpdate { oldTaskOrNull ->
oldTaskOrNull ?: newTask
}
if (oldTaskOrNull == null) {
/*
* Compute the new value on the current thread.
*/
newTask.run()
}
try {
(oldTaskOrNull ?: newTask).get()
}
catch (ee: ExecutionException) {
/*
* Re-set the reference.
*/
valueRef.set(null)
/*
* Don't mask the compilation failure with an ExecutionException.
*/
throw ee.cause ?: ee
}
catch (e: Exception) {
/*
* Re-set the reference.
*/
valueRef.set(null)
/*
* Most probably, an InterruptedException.
*/
throw e
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用示例:
val lambda = {
println("Computing 2x2...")
val timeNanos = System.nanoTime()
if (timeNanos % 2L == 0L) {
throw IOException(timeNanos.toString())
}
2 * 2
}.concurrent()
val resultSeq = sequence {
while (true) {
val element = try {
lambda().toString()
}
catch (ioe: IOException) {
ioe.toString()
}
yield(element)
}
}
resultSeq.take(50).forEach(::println)
Run Code Online (Sandbox Code Playgroud)
上面的代码运行时会产生以下输出:
Computing 2x2...
java.io.IOException: 93224642168398
Computing 2x2...
4
4
4
4
4
4
...
Run Code Online (Sandbox Code Playgroud)
计算结果可以多次重置,但一旦达到成功结果,总是会返回。同时,当计算正在运行时,lambda可以在多个线程之间共享(例如:缓存在 a 中ConcurrentHashMap)。
PS如果您仅限于Java,您可能还想看看该类CompletableFuture。
| 归档时间: |
|
| 查看次数: |
6357 次 |
| 最近记录: |