我正在尝试使用a CloseableHttpAsyncClient从端点读取,将字符串编组为Object(使用javax.json),然后将对象上的数组转换为其各个组件:
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
Run Code Online (Sandbox Code Playgroud)
我需要获取Json数组,然后过滤数组的对象:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
Run Code Online (Sandbox Code Playgroud)
我如何将其Observable<JsonArray>转换为ObservableJsonObject>?
因为它是异步的,我不能使用forEach来创建某种数组来缓冲数据.
因此,使用CloseableHttpAsyncClient可能不是我想要实现的最好的解决方案 - 我今天早上(在所有事情的淋浴中)意识到我正在尝试异步处理数据然后进行异步调用.
理想情况下,调用CloseableHttpClient(sync)并将数据传递给Observable进行过滤将是一种更理想的方法(我不需要第一次调用来管理多个http调用).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try { …Run Code Online (Sandbox Code Playgroud)