标签: completable-future

使用可完成的未来对方法运行基准测试

我正在尝试衡量特定方法的性能。当直接调用该方法时,我运行基准测试得很好,但是当该方法使用带有自定义执行器的完整未来时,一切都崩溃了。我已经实现了使用可完成的未来的方法,以便在该方法花费太长时间时强制超时。

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
@Warmup(iterations = 20)
@Measurement(iterations = 50, timeUnit = TimeUnit.MILLISECONDS)
public String very_big_query(TestState testState) throws Exception {
    return testState.transpiler.process(testState.veryBigQuery);
}
Run Code Online (Sandbox Code Playgroud)
@State(Scope.Thread)
public static class TestState {
    String veryBigQuery;
    Transpiler transpiler;

    @Setup(Level.Trial)
    public void doSetupTrial() throws Exception {
        veryBigQuery = "(";
        for(int i = 0; i < 99; i++) {
            veryBigQuery += String.format("java_%s OR ", i);
        }
        veryBigQuery += "java_100) AND (";
        for(int i = 100; i < 199; i++) {
            veryBigQuery += String.format("java_%s OR …
Run Code Online (Sandbox Code Playgroud)

java benchmarking java-8 jmh completable-future

4
推荐指数
1
解决办法
1272
查看次数

CompletableFuture SupplyAsync(供应商)与 SupplyAsync(供应商,执行者)

Java 文档说CompletableFuture:supplyAsync(Supplier<U> supplier)在 中运行任务,ForkJoinPool#commonPool()CompletableFuture:suppleAsync(supplier, executor)在给定的执行器中运行任务。

我正在尝试弄清楚该使用哪一个。所以我的问题是:

  1. 是什么ForkJoinPool#commonPool()
  2. 我什么时候应该使用 supplyAsync(supplier)vs supplyAsync(supplier, executor)

java multithreading completable-future

4
推荐指数
1
解决办法
2002
查看次数

Kotlin 中的并发 Future 语法

我为 Future 使用简单的 java 代码:

Future<Integer> futureTask = executor.submit(() -> {
    System.out.println("I'm Callable task.");
    return 1 + 1;
});
Run Code Online (Sandbox Code Playgroud)

当我将它粘贴到 kotlin 类时,它会转换为:

val futureTask = executor.submit {
    println("I'm Callable task.")
    1 + 1
}
Run Code Online (Sandbox Code Playgroud)

但是当我尝试获取 Java 类中的值时,我得到的是 null 而不是 number

val integ = futureTask.get()
Run Code Online (Sandbox Code Playgroud)

当我在java代码中编写return时,我的ide警告这里不允许return。

接下来是完整的 kotlin 代码:

fun main(args: Array<String>) {
   val executor = Executors.newSingleThreadExecutor()
   val futureTask = executor.submit {
       println("I'm Callable task.")
       1 + 1
   }

   println(futureTask.get())
   executor.shutdown() }
Run Code Online (Sandbox Code Playgroud)

输出:

I'm Callable task.
null
Run Code Online (Sandbox Code Playgroud)

什么是正确的语法Future

kotlin completable-future

4
推荐指数
1
解决办法
1596
查看次数

Java 8 CompletableFuture - 如何在同一输入上运行多个函数

我使用以下工作代码CompleteableFuture

CompletableFuture<SomeObject> future = CompletableFuture.
            supplyAsync( () -> f1() ).
            thenApplyAsync( f1Output -> f2( f1Output ) ).
            thenApplyAsync( f2Output -> f3( f2Output ) );
Run Code Online (Sandbox Code Playgroud)

是否有可能运行另一个接收f1Output类似以下内容的未来input?

CompletableFuture<SomeObject> future = CompletableFuture.
            supplyAsync( () -> f1() ).
            thenApplyAsync( f1Output -> f2( f1Output ) ).
            someApiThatRuns( f1Output -> f4( f1Output ) ). // <-- 
            thenApplyAsync( f2Output -> f3( f2Output ) );
Run Code Online (Sandbox Code Playgroud)

如果这简化了事情,人们可以忽略 . 返回的结果f4()

java java-8 completable-future

4
推荐指数
1
解决办法
1671
查看次数

CompletableFuture VS @Async

我英语不好。

我使用异步方法。

选项1

public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return (int)(price * 0.9);
        }, threadPoolTaskExecutor);
    }
Run Code Online (Sandbox Code Playgroud)

选项2

@Async
public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return (int)(price * 0.9);
        }, threadPoolTaskExecutor);
    }
Run Code Online (Sandbox Code Playgroud)

我想知道使用 @Async 和不使用它有什么区别。

我认为第一个Option1提供了足够的异步方法。
但是,像Option2那样使用它是否正确呢?

java asynchronous spring-boot completable-future

4
推荐指数
1
解决办法
5478
查看次数

如何强制 CompletableFuture.thenApply() 在运行前一阶段的同一线程上运行?

这是我面临的问题的简短代码版本:

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                /*
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ignored) {}
                */
                //System.out.println("supplyAsync: " + Thread.currentThread().getName());
                return 1;
            })
            .thenApply(i -> {
                System.out.println("apply: " + Thread.currentThread().getName());
                return i + 1;
            })
            .thenAccept((i) -> {
                System.out.println("accept: " + Thread.currentThread().getName());
                System.out.println("result: " + i);
            }).join();
}
Run Code Online (Sandbox Code Playgroud)

这是我得到的输出:

apply: main
accept: main
result: 2
Run Code Online (Sandbox Code Playgroud)

看到main那里我很惊讶!Thread.sleep()我预计当我取消注释该调用或什至取消注释单个语句时会发生类似的情况sysout

supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2
Run Code Online (Sandbox Code Playgroud)

我理解thenApplyAsync()将确保它不会在main线程上运行,但我想避免将供应商从运行的线程返回的数据传递supplyAsync …

java multithreading asynchronous completable-future completion-stage

4
推荐指数
1
解决办法
2920
查看次数

哪个 ExecutorService 最适合阻塞 IO 任务

假设我们有 n 个独立的阻塞 IO 任务,例如对另一台服务器进行休息调用的任务。然后我们需要合并所有答案。每个任务的处理时间可以超过 10 秒。

  1. 我们可以按顺序处理它,最后花费 ~n*10 秒:

    Task1Ans task1 = service1.doSomething();
    Task2Ans task2 = service2.doSomething()
    ...
    return result;
    
    Run Code Online (Sandbox Code Playgroud)
  2. 另一种策略是使用 CompletableFuture 以并行方式处理它,并在所有任务上花费约 10 秒:

    CompletableFuture<Task1Ans> task1Cs = CompletableFuture.supplyAsync(() -> service1.doSomething(), bestExecutor);
    CompletableFuture<Task2Ans> task2Cs = CompletableFuture.supplyAsync(() -> service2.doSomething(), bestExecutor);
    return CompletableFuture.allOf(task1Cs, task2Cs)
       .thenApply(nothing -> {
           ...
           // combine task1, task2 into result object
           return result;
       }).join();
    
    Run Code Online (Sandbox Code Playgroud)

第二种方法有好处,但我不明白哪种类型的线程池最适合此类任务:

ExecutorService bestExecutor = Executors.newFixedThreadPool(30)   /// or Executors.newCachedThreadPool() or Executors.newWorkStealingPool()
Run Code Online (Sandbox Code Playgroud)

我的问题是哪个 ExecutorService 最适合处理 n 并行阻塞 IO 任务。

java executorservice completable-future

4
推荐指数
1
解决办法
2016
查看次数

如何对 CompletableFuture 流使用 join

据说我有以下方法

public static Stream<CompletableFuture<String>> findPricesStream(String product) {}
Run Code Online (Sandbox Code Playgroud)

该方法将寻找给定 a 的最便宜价格product,并返回 CompletableFuture 流。

现在我想在流中的值可用时立即对其做出反应。为此,我采用的方法thenAccept和实现可以如下

    1. public static void reactToEarliestResultWithRaw() {
    2.     long start = System.nanoTime();
    3.     CompletableFuture[] priceFuture = findPricesStream(WHATEVER_PRODUCT_NAME)
    4.             .map(completableFuture -> completableFuture.thenAccept(
    5.                     s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
    6.             .toArray(CompletableFuture[]::new);
    7.     CompletableFuture.allOf(priceFuture).join();
    8.     System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
    9. }
Run Code Online (Sandbox Code Playgroud)

通过这个实现,我得到了所需的输出

LetsSaveBig …
Run Code Online (Sandbox Code Playgroud)

java completable-future

4
推荐指数
1
解决办法
4289
查看次数

Java 多线程中的 Thread、Runnable 和 CompletableFuture

我正在尝试在我的 Spring Boot 应用程序中实现多线程。我只是 Java 多线程的初学者,在进行了一些搜索并阅读了各个页面上的文章后,我需要澄清以下几点。所以;

  1. 据我所知,我可以使用Thread,RunnableCompletableFuture来在 Java 应用程序中实现多线程。CompletableFuture似乎是一种更新、更清洁的方式,但Thread可能有更多优点。那么,我应该根据场景坚持使用CompletableFuture还是全部使用?

  2. 基本上我想使用以下方法向同一个服务方法发送 2 个并发请求CompletableFuture

    CompletableFuture<Integer> future1 = fetchAsync(1);
    CompletableFuture<Integer> future2 = fetchAsync(2);
    
    Integer result1 = future1.get();
    Integer result2 = future2.get();
    
    Run Code Online (Sandbox Code Playgroud)

如何同时发送这些请求,然后根据以下条件返回结果:

  • 如果第一个结果不为空,则返回结果并停止进程
  • 如果第一个结果为空,则返回第二个结果并停止进程

我怎样才能做到这一点?我应该用CompletableFuture.anyOf()它吗?

java spring multithreading process completable-future

4
推荐指数
1
解决办法
1299
查看次数

如何将CompletableFuture.supplyAsync与PriorityBlockingQueue一起使用?

我正在尝试通过CompletableFuture.supplyAsync将优先级队列添加到使用ThreadPoolExecutor和LinkedBlockingQueue的现有应用程序.问题是我无法想出一个设计来分配我可以在PriorityBlockingQueue的Comparator中访问的任务优先级.这是因为我的任务被CompletableFuture包装到一个名为AsyncSupply的私有内部类的实例中,该实例隐藏了私有字段中的原始任务.然后使用这些AsteSupply对象作为Runnables调用Comparator,如下所示:

public class PriorityComparator<T extends Runnable> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {

        // T is an AsyncSupply object.
        // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
        return 0;
    }
}
Run Code Online (Sandbox Code Playgroud)

我调查了扩展CompletableFuture的可能性,因此我可以将它包装在一个不同的对象中,但很多CompletableFuture被封装且不可用.因此,扩展它似乎不是一种选择.也没有用适配器封装它,因为它实现了非常宽的接口.

除了复制整个CompletableFuture并修改它之外,我不确定如何解决这个问题.有任何想法吗?

java executorservice threadpoolexecutor java-threads completable-future

3
推荐指数
1
解决办法
1440
查看次数