针对子请求的CompletableFuture

O-O*_*F-N 5 java java-8 asynchttpclient completable-future

我试图理解Java 8中的CompletableFuture.作为其中的一部分,我正在尝试进行一些REST调用以巩固我的理解.我正在使用这个库来进行REST调用:https://github.com/AsyncHttpClient/async-http-client.

请注意,此库返回GET调用的Response对象.

以下是我要做的事情:

  1. 请调用此URL,该URL提供用户列表:https://jsonplaceholder.typicode.com/users
  2. 使用GSON将响应转换为用户对象列表.
  3. 迭代列表中的每个User对象,获取userID,然后从以下URL获取用户发布的帖子列表:https://jsonplaceholder.typicode.com/posts?userId = 1
  4. 使用GSON将每个帖子响应转换为Post对象.
  5. 构建UserPost对象的集合,每个对象都有一个用户对象和用户发布的帖子列表.

    public class UserPosts {
    
    private final User user;
    private final List<Post> posts;
    
    public UserPosts(User user, List<Post> posts) {
        this.user = user;
        this.posts = posts;
    }
    
    @Override
    public String toString() {
        return "user = " + this.user + " \n" + "post = " + posts+ " \n \n";
    }
    
    Run Code Online (Sandbox Code Playgroud)

    }

我目前实现如下:

package com.CompletableFuture;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.asynchttpclient.Response;

import com.http.HttpResponse;
import com.http.HttpUtil;
import com.model.Post;
import com.model.User;
import com.model.UserPosts;

/**
 * Created by vm on 8/20/18.
 */

class UserPostResponse {
    private final User user;
    private final Future<Response> postResponse;

    UserPostResponse(User user, Future<Response> postResponse) {
        this.user = user;
        this.postResponse = postResponse;
    }

    public User getUser() {
        return user;
    }

    public Future<Response> getPostResponse() {
        return postResponse;
    }
}

public class HttpCompletableFuture extends HttpResponse {
    private Function<Future<Response>, List<User>> userResponseToObject = user -> {
        try {
            return super.convertResponseToUser(Optional.of(user.get().getResponseBody())).get();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };
    private Function<Future<Response>, List<Post>> postResponseToObject = post -> {
        try {
            return super.convertResponseToPost(Optional.of(post.get().getResponseBody())).get();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };

    private Function<UserPostResponse, UserPosts> buildUserPosts = (userPostResponse) -> {
        try {
            return new UserPosts(userPostResponse.getUser(), postResponseToObject.apply(userPostResponse.getPostResponse()));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };

    private Function<User, UserPostResponse> getPostResponseForUser = user -> {
        Future<Response> resp = super.getPostsForUser(user.getId());
        return new UserPostResponse(user, resp);
    };

    public HttpCompletableFuture() {
        super(HttpUtil.getInstance());
    }

    public List<UserPosts> getUserPosts() {

        try {
            CompletableFuture<List<UserPosts>> usersFuture = CompletableFuture
                    .supplyAsync(() -> super.getUsers())
                    .thenApply(userResponseToObject)
                    .thenApply((List<User> users)-> users.stream().map(getPostResponseForUser).collect(Collectors.toList()))
                    .thenApply((List<UserPostResponse> userPostResponses ) -> userPostResponses.stream().map(buildUserPosts).collect(Collectors.toList()));

            List<UserPosts> users = usersFuture.get();
            System.out.println(users);
            return users;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}
Run Code Online (Sandbox Code Playgroud)

但是,我不确定我这样做的方式是否正确.更具体地说,在userResponseToObjectpostResponseToObject函数中,我正在调用get()Future上的方法,它将被阻塞.

有没有更好的方法来实现这个?

Mis*_*sha 3

如果您打算使用CompletableFuture,您应该使用ListenableFutureasync-http-client 库。 ListenableFuture可以转换为CompletableFuture.

使用的优点CompletableFuture是您可以编写处理Response对象的逻辑,而无需了解有关 future 或线程的任何信息。假设您编写了以下 4 个方法。2 发出请求,2 解析响应:

ListenableFuture<Response> requestUsers() {

}

ListenableFuture<Response> requestPosts(User u) {

}

List<User> parseUsers(Response r) {

}

List<UserPost> parseUserPosts(Response r, User u) {

}
Run Code Online (Sandbox Code Playgroud)

现在我们可以编写一个非阻塞方法来检索给定用户的帖子:

CompletableFuture<List<UserPost>> userPosts(User u) {
    return requestPosts(u)
        .toCompletableFuture()
        .thenApply(r -> parseUserPosts(r, u));
}
Run Code Online (Sandbox Code Playgroud)

以及读取所有用户的所有帖子的阻止方法:

List<UserPost> getAllPosts() {
    // issue all requests
    List<CompletableFuture<List<UserPost>>> postFutures = requestUsers()
            .toCompletableFuture()
            .thenApply(userRequest -> parseUsers(userRequest)
                    .stream()
                    .map(this::userPosts)
                    .collect(toList())
            ).join();


    // collect the results
    return postFutures.stream()
            .map(CompletableFuture::join)
            .flatMap(List::stream)
            .collect(toList());
}
Run Code Online (Sandbox Code Playgroud)