use*_*678 3 java multithreading rx-java
我有一些非常简单的代码,阅读一堆字符串并应用过滤器.我希望过滤器在多个线程上运行.
Iterable<String> outputs = Observable
.from(Files.readLines(new File("E:\\SAMA\\Test\\ImageNetBullets.txt"), Charset.forName("utf-8")))
.take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
.toBlocking().toIterable();
Run Code Online (Sandbox Code Playgroud)
从日志中,似乎Filter方法只在一个线程上运行:
In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1
Run Code Online (Sandbox Code Playgroud)
我该如何加快速度?
RxJava本质上是顺序的.例如,使用map(Func1),Func1本身将与通过父序列的值非并发执行:
Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
这里,lambda v - > v*v将以顺序方式调用值1到10.
RxJava可以以管道中的阶段(range-> map-> subscribe)相对于彼此同时/并行发生的方式异步.例如:
Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v) // (1)
.observeOn(Schedulers.io())
.map(v -> -v) // (2)
.toBlocking()
.subscribe(System.out::println); // (3)
Run Code Online (Sandbox Code Playgroud)
这里,(1)可以与(2)和(3)并行运行,即,(2)计算av = 3*3,(1)可能已经计算v = 5并且(3)-1在同一时间打印出来时间.
如果你想同时处理序列的元素,你必须将序列"分出"成子Observable,然后将结果连接回flatMap:
Observable.range(1, 10)
.flatMap(v ->
Observable.just(v)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
在这里,每个值v将启动一个Observable在后台线程上运行的新值,并通过map()进行计算.v = 1可以在线程1上v = 2运行,可以在线程2上v = 3运行,可以在线程1上运行但是在v = 1计算之后严格运行.
| 归档时间: |
|
| 查看次数: |
247 次 |
| 最近记录: |