小编use*_*870的帖子

将字符串转换为数组到Observable中的对象

我正在尝试使用a CloseableHttpAsyncClient从端点读取,将字符串编组为Object(使用javax.json),然后将对象上的数组转换为其各个组件:

CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();

client.start();

Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
            .toObservable();

Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
        String stringVal = new String(bb);
        StringReader reader = new StringReader(stringVal);
        JsonObject jobj = Json.createReader(reader).readObject();
        return jobj.getJsonArray("elements");
    })).share();
Run Code Online (Sandbox Code Playgroud)

我需要获取Json数组,然后过滤数组的对象:

Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
Run Code Online (Sandbox Code Playgroud)

我如何将其Observable<JsonArray>转换为ObservableJsonObject>

因为它是异步的,我不能使用forEach来创建某种数组来缓冲数据.

更新:

因此,使用CloseableHttpAsyncClient可能不是我想要实现的最好的解决方案 - 我今天早上(在所有事情的淋浴中)意识到我正在尝试异步处理数据然后进行异步调用.

理想情况下,调用CloseableHttpClient(sync)并将数据传递给Observable进行过滤将是一种更理想的方法(我不需要第一次调用来管理多个http调用).

    CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();

    StringBuffer result = new StringBuffer();

    try { …
Run Code Online (Sandbox Code Playgroud)

java json asynchronous reactive-programming rx-java

5
推荐指数
1
解决办法
3281
查看次数

标签 统计

asynchronous ×1

java ×1

json ×1

reactive-programming ×1

rx-java ×1