Gus*_*son 13 java parallel-processing multithreading java-8 java-stream
我有一组供应商,它们都有相同的结果,但速度不同(且不同).
我想要一种优雅的方式同时启动供应商,并且只要其中一个产生了价值,就将其退回(丢弃其他结果).
我已经尝试过使用并行流,Stream.findAny()但是它总是会阻塞,直到产生所有结果.
这是一个单元测试,展示了我的问题:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
Run Code Online (Sandbox Code Playgroud)
测试的结果是最后一个断言失败,因为整个测试大约需要10秒才能完成.
我在这做错了什么?
Hol*_*ger 11
在这种情况下,您最好使用Callable而不是Supplier(相同的功能签名)并使用自Java 5以来存在的良好的旧并发API:
Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
Thread.sleep(10_000);
return "slow";
}
);
ExecutorService es=Executors.newCachedThreadPool();
try {
String result = es.invokeAny(suppliers);
System.out.println(result);
} catch (InterruptedException|ExecutionException ex) {
Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();
Run Code Online (Sandbox Code Playgroud)
请注意,整个"全部运行并返回最快"的方法如何成为单个方法调用...
一旦有一个结果可用,它还有取消/中断所有待处理操作的好处,因此慢速操作实际上不会在这里等待整整十秒(好吧,在大多数情况下,因为时间不是确定性的).
| 归档时间: |
|
| 查看次数: |
1052 次 |
| 最近记录: |