从调用方法并行执行方法

joh*_*ohn 16 java multithreading callable thread-safety executorservice

我有一个正在使用的客户库和传入DataRequest对象拥有userid,timeout并且在它的一些其他领域.现在我使用这个DataRequest对象创建一个URL,然后我使用了一个HTTP调用RestTemplate,我的服务返回一个JSON响应,我用它来创建一个DataResponse对象并将这个DataResponse对象返回给它们.

以下是DataClient客户通过将DataRequest对象传递给我的类.DataRequest如果在getSyncData方法中花费太多时间,我正在使用客户传递的超时值来超时请求.

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}
Run Code Online (Sandbox Code Playgroud)

DataFetcherTask 类:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}
Run Code Online (Sandbox Code Playgroud)

截至目前,我的DataFetcherTask班级负责一个DataRequest密钥,如上所示..

问题陈述:-

现在我有一个小的设计变化.客户将DataRequest(例如keyA)对象传递给我的库,然后我将通过使用DataRequest(keyA)对象中存在的用户ID来对另一个服务(我在当前设计中没有做)进行新的http调用用户ID的列表,所以我将使用这些用户ID,并DataRequest为响应中返回的每个用户ID创建一个其他(keyB,keyC,keyD)对象.然后我将拥有List<DataRequest>具有keyB,keyC和keyD DataRequest对象的对象.List<DataRequest>将是三个中的最大元素,就是全部.

现在对于每个DataRequest对象,List<DataRequest>我想DataFetcherTask.call并行执行上面的方法,然后List<DataResponse>通过DataResponse为每个键添加每个.所以我将有三个并行调用DataFetcherTask.call.此并行调用背后的想法是在相同的全局超时值中获取所有这三个最大键的数据.

所以我的建议是 - DataFetcherTask类将返回List<DataResponse>对象而不是DataResponse然后签名getSyncDatagetAsyncData方法也会改变.所以这是算法:

  • 使用客户传递的DataRequest对象通过List<DataRequest>调用另一个HTTP服务来进行.
  • 对方法中的每个DataRequest进行并行调用,而不是将对象返回给客户.List<DataRequest>DataFetcherTask.callList<DataResponse>DataResponse

通过这种方式,我可以在步骤1中应用相同的全局超时以及步骤2.如果上述步骤中的任何一个花费时间,我们将在getSyncData方法中超时.

DataFetcherTask 设计变更后的课程:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我的问题是 -

  • 它必须是这样的吗?解决这个问题的正确设计是什么?我的意思是让call另一种call方法的方法看起来很奇怪?
  • 我们需要在代码中拥有两个执行器吗?有没有更好的方法来解决这个问题或我们可以在这里做的任何简化/设计变更?

我已经简化了代码,以便想法清楚我想要做什么.

Ste*_*stl 4

正如您问题的评论中已经提到的,您可以使用 Java 的 ForkJoin 框架。这将为您节省DataFetcherTask.

您只需ForkJoinPool在您中使用 aDataClient并将您转换DataFetcherTask为 a (的子类型RecursiveTask之一)。ForkJoinTask这使您可以轻松地并行执行其他子任务。

因此,经过这些修改后,您的代码将如下所示:

数据获取任务

DataFetcherTask现在,它首先RecursiveTask生成密钥并为每个生成的密钥调用子任务。ForkJoinPool这些子任务与父任务一样执行。

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}
Run Code Online (Sandbox Code Playgroud)

数据客户端

DataClient除了新的线程池之外,不会有太大变化:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}
Run Code Online (Sandbox Code Playgroud)

一旦您使用 Java8,您可能会考虑将实现更改为CompletableFutures。然后它看起来像这样:

数据客户端CF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}
Run Code Online (Sandbox Code Playgroud)

正如评论中提到的,GuavaListenableFuture将为 Java7 提供类似的功能,但如果没有 Lambda,它们往往会变得笨拙。