use*_*870 5 java json asynchronous reactive-programming rx-java
我正在尝试使用a CloseableHttpAsyncClient从端点读取,将字符串编组为Object(使用javax.json),然后将对象上的数组转换为其各个组件:
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
Run Code Online (Sandbox Code Playgroud)
我需要获取Json数组,然后过滤数组的对象:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
Run Code Online (Sandbox Code Playgroud)
我如何将其Observable<JsonArray>转换为ObservableJsonObject>?
因为它是异步的,我不能使用forEach来创建某种数组来缓冲数据.
因此,使用CloseableHttpAsyncClient可能不是我想要实现的最好的解决方案 - 我今天早上(在所有事情的淋浴中)意识到我正在尝试异步处理数据然后进行异步调用.
理想情况下,调用CloseableHttpClient(sync)并将数据传递给Observable进行过滤将是一种更理想的方法(我不需要第一次调用来管理多个http调用).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
Run Code Online (Sandbox Code Playgroud)
试试这个代码:
\n\n String myjson = "{\\"elements\\": [{\\"text\\":\\"Obj1\\"},{\\"text\\":\\"Obj2\\"}, {\\"text\\":\\"Obj3\\"}]}";\n\n Observable.just(myjson)\n .map(jsonStr -> new StringReader(myjson))\n .map(reader -> Json.createReader(reader).readObject())\n .map(jobj -> jobj.getJsonArray("elements"))\n .map(elements -> elements.toArray(new JsonObject[elements.size()]))\n .flatMap(jsonObjects -> Observable.from(jsonObjects))\n .subscribe(\n (jsonObject) -> System.out.println(jsonObject.getString("text")),\n throwable -> throwable.printStackTrace(),\n () -> System.out.println("On complete"));\nRun Code Online (Sandbox Code Playgroud)\n\n结果:
\n\n\n\n\n07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out\xef\xb9\x95\n Obj1
\n
07-22 12:19:22.362 8032-8032/com.mediamanagment.app\n I/System.out\xef\xb9\x95 Obj2
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out\xef\xb9\x95 Obj3
注意:
\n您应该使用此依赖项:
compile \'org.glassfish:javax.json:1.0.4\'\nRun Code Online (Sandbox Code Playgroud)\n\n相反,这个:
\n\ncompile \'javax.json:javax.json-api:1.0\'\nRun Code Online (Sandbox Code Playgroud)\n\n如果您要使用,\'javax.json:javax.json-api:1.0\'您将进入javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found以下步骤:
.map(reader -> Json.createReader(reader).readObject())\nRun Code Online (Sandbox Code Playgroud)\n\n其中,请使用\'org.glassfish:javax.json:1.0.4\'
更新: \n另外,而不是
\n\n.flatMap(jsonObjects -> Observable.from(jsonObjects))\nRun Code Online (Sandbox Code Playgroud)\n\n您可以使用flatMapIterable(\xe2\x80\x89):
.flatMapIterable(jsonObjects -> jsonObjects)\nRun Code Online (Sandbox Code Playgroud)\n