我写了下面提到的方法来批量从couchbase服务器获取数据.
bucket.async()
.query(N1qlQuery.simple(query))
.doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t)))
.flatMap(AsyncN1qlQueryResult::rows)
.flatMap(row ->
bucket.async().
get(row.value().getString("id")))
.map(JsonDocument::content).
toList()
.toBlocking()
.single();
Run Code Online (Sandbox Code Playgroud)
传递查询时,此代码正常工作
"SELECT meta().id as id FROM bucket"
Run Code Online (Sandbox Code Playgroud)
但是当我使用类似的东西时
"SELECT meta().id as id FROM bucket order by id ASC"
Run Code Online (Sandbox Code Playgroud)
我得到的结果没有排序.但是,当我在查询控制台上运行相同的查询时,结果如预期的那样.这让我相信我在rxJava中做错了什么.请帮我解决这个问题.
由于flatMap()运营商应用并发流但不维护订单,因此订单丢失.
当你申请时,flatMap()你正在Observable为每个人创建和订阅一个新的onNext(),意味着对于每一行,你并行执行这一行:
bucket.async().
get(row.value().getString("id")))
Run Code Online (Sandbox Code Playgroud)
然后每个get操作将在不同的时间内完成,并且获取的内容将被无序发出.
如果要维护顺序但不想丢失并行性,则应使用仅维护1个活动流的concatMap(),并按顺序订阅每个提取操作.
如果你确实需要/想要并行,你应该使用concatMapEager(),它将并行执行每个创建的Observable,但会按顺序发出项目.
| 归档时间: |
|
| 查看次数: |
93 次 |
| 最近记录: |