roc*_*cky 10 java multithreading
我需要找到一种在java中并行执行任务(依赖和独立)的方法.
我检查了java.util.concurrent Future和Fork/Join,但看起来我们无法向Task添加依赖项.
任何人都可以指出我更正Java API.
Sni*_*192 11
在Scala中这很容易做到,我认为你最好使用Scala.这是我从这里开始的一个例子http://danielwestheide.com/(新手的Scala指南第16部分:从哪里开始)这个人有一个很棒的博客(我不是那个人)
让我们一起喝咖啡吧.要做的任务是:
或作为一棵树:
Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk
在使用并发api的java中,这将是:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }
    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }
    static class Brew implements Callable<String> {
        final Future<String> grindedBeans;
        final Future<String> hotWater;
        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }
        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }
    static class FrothMilk implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }
    static class Combine implements Callable<String> {
        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }
        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }
    }
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);
        try {
            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }
确保您添加超时以确保您的代码不会永远等待完成某些事情,这可以通过使用Future.get(long,TimeUnit)完成,然后相应地处理失败.
它在scala中更好,但是,它就像在博客上一样:准备一些咖啡的代码看起来像这样:
def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
所有方法返回未来(键入的未来),例如grind将是这样的:
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}
对于所有实现,请查看博客,但这就是它的全部内容.您也可以轻松集成Scala和Java.我真的建议在Scala而不是Java中做这种事情.Scala需要更少的代码,更清晰和事件驱动.
您需要的是CountDownLatch。
final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();
// thread c
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();
new Thread() {
    public void run() {
        try {
            gate.await();
            // both thread a and thread c have completed
            // process thread b
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}.start();
作为替代方案,根据您的场景,您也可以使用 BlockingQueue来实现生产者-消费者模式。请参阅文档页面上的示例。
| 归档时间: | 
 | 
| 查看次数: | 6915 次 | 
| 最近记录: |