Seb*_*oth 4 reactive-programming swift rx-swift
给定一个这样的服务类:
class Service {
let networkService = NetworkService()
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { $0.a }
}
}
Run Code Online (Sandbox Code Playgroud)
当我像这样从调用方使用它时:
let service = Service()
Observable
.from(["Hello", "World"])
.flatMap {
service.handleJobA($0)
}
.subscribe()
Run Code Online (Sandbox Code Playgroud)
然后这将同时发送多个请求service。我希望流等待每个请求完成。使用merge操作符可以实现这一点。
Observable
.from(["Hello", "World"])
.flatMap {
Observable.just(
service.handleJobA($0)
)
}
.merge(maxConcurrent: 1)
.subscribe()
Run Code Online (Sandbox Code Playgroud)
到目前为止,一切都很好 - 该服务不会同时执行多项handleJobA任务。
然而,并发是一个服务细节,调用者不应该关心它。事实上,服务在稍后阶段可能会决定允许不同的并发值。
其次,当我添加一个新方法时handleJobB,它不能与作业 A 同时处于活动状态,反之亦然。
所以我的问题是:
您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到游乐场的示例:
/// playground
import RxSwift
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job a")
sleep(3)
observer.onNext(input)
print("complete job a")
observer.onCompleted()
return Disposables.create()
}.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job b")
sleep(3)
observer.onNext(input)
print("complete job b")
observer.onCompleted()
return Disposables.create()
return Disposables.create()
}.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobA(input: $0) }
.subscribe(onNext:{
print("result " + $0)
})
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobB(input: $0) }
.subscribe(onNext:{
print("result " + $0)
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1443 次 |
| 最近记录: |