Java并发:在任务池中查找任务失败

Ama*_*ath 2 java concurrency multithreading

SSCCE之后显示从服务器获取记录(比如说).ExecutorService用于创建ThreadPool2个线程,我用Timeout3秒调用所有这些线程.故意我做了一些失败的任务.

现在我的问题是,如何让EmpID的任务失败?

MainClass:

public class MultiThreadEx {
    public static void main(String[] args) {

        String[] empIDArray = {
                "100", "200", "300", "400", "500", 
                "600", "700", "800", "900", "1000", 
                "1100", "1200", "1300", "1400", "1500"
            };

        List<ThreadTaskEach> taskList = new ArrayList<ThreadTaskEach>();
        try {
            for(String empID : empIDArray) {
                taskList.add(new ThreadTaskEach(empID));
            }
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
        }

        List<Future<Map<String, String>>> futureList = null;
        try {
            ExecutorService service = Executors.newFixedThreadPool(2);
            futureList = service.invokeAll(taskList, 3, TimeUnit.SECONDS);
            service.shutdown();
        } catch(InterruptedException ie) {
            System.out.println("Exception occured: " + ie.getMessage());
        }

        for(Future<Map<String, String>> future : futureList) {
            try {
                Map<String, String> resultMap = future.get();

                for(String key : resultMap.keySet()) {
                    System.out.println(resultMap.get(key));
                }

            } catch(ExecutionException ee) {
                System.out.println("Exception occured: " + ee.getMessage());
            } catch (InterruptedException ie) {
                System.out.println("Exception occured: " + ie.getMessage());
            } catch(CancellationException e) {
                System.out.println("Exception occured: " + e.getMessage());
            }
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

线程类

class ThreadTaskEach implements Callable<Map<String, String>>{

    private String empID;

    public ThreadTaskEach(String empID) {
        this.empID = empID;
    }

    @Override
    public Map<String, String> call() throws Exception {
        try {
            return prepareMap(empID);
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
            throw new Exception("Exception occured: " + e.getMessage());
        }
    }

    private Map<String, String> prepareMap(String empID) throws InterruptedException {
        Map<String, String> map = new HashMap<String, String>();

        if(Integer.parseInt(empID) % 500 == 0) {
            Thread.sleep(5000);
        }

        map.put(empID, empID + ": " + Thread.currentThread().getId());

        return map;
    }
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码500中,1000 ..未能在3秒内完成.

Map<String, String> resultMap = future.get();
Run Code Online (Sandbox Code Playgroud)

所以当我说Future.get()来完成这些任务时CancellationException.但是如何从任务中获得EmpID?

ass*_*ias 6

invokeAll您可以submit逐个使用每个任务,而不是使用每个任务,并将每个未来存储在地图中:

Future<?> f = executor.submit(task);
map.put(f, task.getId());
Run Code Online (Sandbox Code Playgroud)

现在,当您尝试获取时,如果您有异常,则可以使用地图返回到id.但是,您需要在每个上面设置超时,future.get()这可能对您的用例不实用.

另一种方法是使用invokeAll 哪种保证的规范以与提交的任务相同的顺序返回期货.

返回表示任务的Futures列表,其顺序与迭代器为给定任务列表生成的顺序相同.

只要您使用List,迭代顺序就是固定的,您可以匹配这两个列表:

    for (int i = 0; i < futureList.size(); i++) {
        Future<Map<String, String>> future = futureList.get(i)
        try {
            Map<String, String> resultMap = future.get();

            for(String key : resultMap.keySet()) {
                System.out.println(resultMap.get(key));
            }
        } catch(ExecutionException ee) {
            System.out.println("Exception in task " + taskList.get(i).getId());
        }
    }
Run Code Online (Sandbox Code Playgroud)

只需确保使用具有稳定迭代顺序的集合(ArrayList很好).