我正在尝试衡量特定方法的性能。当直接调用该方法时,我运行基准测试得很好,但是当该方法使用带有自定义执行器的完整未来时,一切都崩溃了。我已经实现了使用可完成的未来的方法,以便在该方法花费太长时间时强制超时。
@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
@Warmup(iterations = 20)
@Measurement(iterations = 50, timeUnit = TimeUnit.MILLISECONDS)
public String very_big_query(TestState testState) throws Exception {
return testState.transpiler.process(testState.veryBigQuery);
}
Run Code Online (Sandbox Code Playgroud)
@State(Scope.Thread)
public static class TestState {
String veryBigQuery;
Transpiler transpiler;
@Setup(Level.Trial)
public void doSetupTrial() throws Exception {
veryBigQuery = "(";
for(int i = 0; i < 99; i++) {
veryBigQuery += String.format("java_%s OR ", i);
}
veryBigQuery += "java_100) AND (";
for(int i = 100; i < 199; i++) {
veryBigQuery += String.format("java_%s OR …Run Code Online (Sandbox Code Playgroud) Java 文档说CompletableFuture:supplyAsync(Supplier<U> supplier)在 中运行任务,ForkJoinPool#commonPool()而CompletableFuture:suppleAsync(supplier, executor)在给定的执行器中运行任务。
我正在尝试弄清楚该使用哪一个。所以我的问题是:
ForkJoinPool#commonPool()?supplyAsync(supplier)vs supplyAsync(supplier, executor)?我为 Future 使用简单的 java 代码:
Future<Integer> futureTask = executor.submit(() -> {
System.out.println("I'm Callable task.");
return 1 + 1;
});
Run Code Online (Sandbox Code Playgroud)
当我将它粘贴到 kotlin 类时,它会转换为:
val futureTask = executor.submit {
println("I'm Callable task.")
1 + 1
}
Run Code Online (Sandbox Code Playgroud)
但是当我尝试获取 Java 类中的值时,我得到的是 null 而不是 number
val integ = futureTask.get()
Run Code Online (Sandbox Code Playgroud)
当我在java代码中编写return时,我的ide警告这里不允许return。
接下来是完整的 kotlin 代码:
fun main(args: Array<String>) {
val executor = Executors.newSingleThreadExecutor()
val futureTask = executor.submit {
println("I'm Callable task.")
1 + 1
}
println(futureTask.get())
executor.shutdown() }
Run Code Online (Sandbox Code Playgroud)
输出:
I'm Callable task.
null
Run Code Online (Sandbox Code Playgroud)
什么是正确的语法Future?
我使用以下工作代码CompleteableFuture:
CompletableFuture<SomeObject> future = CompletableFuture.
supplyAsync( () -> f1() ).
thenApplyAsync( f1Output -> f2( f1Output ) ).
thenApplyAsync( f2Output -> f3( f2Output ) );
Run Code Online (Sandbox Code Playgroud)
是否有可能运行另一个接收f1Output类似以下内容的未来input?:
CompletableFuture<SomeObject> future = CompletableFuture.
supplyAsync( () -> f1() ).
thenApplyAsync( f1Output -> f2( f1Output ) ).
someApiThatRuns( f1Output -> f4( f1Output ) ). // <--
thenApplyAsync( f2Output -> f3( f2Output ) );
Run Code Online (Sandbox Code Playgroud)
如果这简化了事情,人们可以忽略 . 返回的结果f4()。
我英语不好。
我使用异步方法。
选项1
public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
return CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return (int)(price * 0.9);
}, threadPoolTaskExecutor);
}
Run Code Online (Sandbox Code Playgroud)
选项2
@Async
public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
return CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return (int)(price * 0.9);
}, threadPoolTaskExecutor);
}
Run Code Online (Sandbox Code Playgroud)
我想知道使用 @Async 和不使用它有什么区别。
我认为第一个Option1提供了足够的异步方法。
但是,像Option2那样使用它是否正确呢?
这是我面临的问题的简短代码版本:
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
/*
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}
*/
//System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
})
.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
}
Run Code Online (Sandbox Code Playgroud)
这是我得到的输出:
apply: main
accept: main
result: 2
Run Code Online (Sandbox Code Playgroud)
看到main那里我很惊讶!Thread.sleep()我预计当我取消注释该调用或什至取消注释单个语句时会发生类似的情况sysout:
supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2
Run Code Online (Sandbox Code Playgroud)
我理解thenApplyAsync()将确保它不会在main线程上运行,但我想避免将供应商从运行的线程返回的数据传递supplyAsync …
java multithreading asynchronous completable-future completion-stage
假设我们有 n 个独立的阻塞 IO 任务,例如对另一台服务器进行休息调用的任务。然后我们需要合并所有答案。每个任务的处理时间可以超过 10 秒。
我们可以按顺序处理它,最后花费 ~n*10 秒:
Task1Ans task1 = service1.doSomething();
Task2Ans task2 = service2.doSomething()
...
return result;
Run Code Online (Sandbox Code Playgroud)
另一种策略是使用 CompletableFuture 以并行方式处理它,并在所有任务上花费约 10 秒:
CompletableFuture<Task1Ans> task1Cs = CompletableFuture.supplyAsync(() -> service1.doSomething(), bestExecutor);
CompletableFuture<Task2Ans> task2Cs = CompletableFuture.supplyAsync(() -> service2.doSomething(), bestExecutor);
return CompletableFuture.allOf(task1Cs, task2Cs)
.thenApply(nothing -> {
...
// combine task1, task2 into result object
return result;
}).join();
Run Code Online (Sandbox Code Playgroud)
第二种方法有好处,但我不明白哪种类型的线程池最适合此类任务:
ExecutorService bestExecutor = Executors.newFixedThreadPool(30) /// or Executors.newCachedThreadPool() or Executors.newWorkStealingPool()
Run Code Online (Sandbox Code Playgroud)
我的问题是哪个 ExecutorService 最适合处理 n 并行阻塞 IO 任务。
据说我有以下方法
public static Stream<CompletableFuture<String>> findPricesStream(String product) {}
Run Code Online (Sandbox Code Playgroud)
该方法将寻找给定 a 的最便宜价格product,并返回 CompletableFuture 流。
现在我想在流中的值可用时立即对其做出反应。为此,我采用的方法thenAccept和实现可以如下
1. public static void reactToEarliestResultWithRaw() {
2. long start = System.nanoTime();
3. CompletableFuture[] priceFuture = findPricesStream(WHATEVER_PRODUCT_NAME)
4. .map(completableFuture -> completableFuture.thenAccept(
5. s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
6. .toArray(CompletableFuture[]::new);
7. CompletableFuture.allOf(priceFuture).join();
8. System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
9. }
Run Code Online (Sandbox Code Playgroud)
通过这个实现,我得到了所需的输出
LetsSaveBig …Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的 Spring Boot 应用程序中实现多线程。我只是 Java 多线程的初学者,在进行了一些搜索并阅读了各个页面上的文章后,我需要澄清以下几点。所以;
据我所知,我可以使用Thread,Runnable或CompletableFuture来在 Java 应用程序中实现多线程。CompletableFuture似乎是一种更新、更清洁的方式,但Thread可能有更多优点。那么,我应该根据场景坚持使用CompletableFuture还是全部使用?
基本上我想使用以下方法向同一个服务方法发送 2 个并发请求CompletableFuture:
CompletableFuture<Integer> future1 = fetchAsync(1);
CompletableFuture<Integer> future2 = fetchAsync(2);
Integer result1 = future1.get();
Integer result2 = future2.get();
Run Code Online (Sandbox Code Playgroud)
如何同时发送这些请求,然后根据以下条件返回结果:
我怎样才能做到这一点?我应该用CompletableFuture.anyOf()它吗?
我正在尝试通过CompletableFuture.supplyAsync将优先级队列添加到使用ThreadPoolExecutor和LinkedBlockingQueue的现有应用程序.问题是我无法想出一个设计来分配我可以在PriorityBlockingQueue的Comparator中访问的任务优先级.这是因为我的任务被CompletableFuture包装到一个名为AsyncSupply的私有内部类的实例中,该实例隐藏了私有字段中的原始任务.然后使用这些AsteSupply对象作为Runnables调用Comparator,如下所示:
public class PriorityComparator<T extends Runnable> implements Comparator<T> {
@Override
public int compare(T o1, T o2) {
// T is an AsyncSupply object.
// BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
return 0;
}
}
Run Code Online (Sandbox Code Playgroud)
我调查了扩展CompletableFuture的可能性,因此我可以将它包装在一个不同的对象中,但很多CompletableFuture被封装且不可用.因此,扩展它似乎不是一种选择.也没有用适配器封装它,因为它实现了非常宽的接口.
除了复制整个CompletableFuture并修改它之外,我不确定如何解决这个问题.有任何想法吗?
java executorservice threadpoolexecutor java-threads completable-future
java ×9
asynchronous ×2
java-8 ×2
benchmarking ×1
java-threads ×1
jmh ×1
kotlin ×1
process ×1
spring ×1
spring-boot ×1