如果我指定maxPublishers参数,则第一个 maxPublishers 事件之后的源事件将不会进行平面映射。虽然我只想限制并发数。也就是说,在第一个 maxPublishers 平面地图发布程序完成后继续处理下一个事件。
Publishers.Merge(
addImageRequestSubject
.flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
.compactMap { $0 }
.flatMap(maxPublishers: .max(3)) { self.addImage($0) },
addVideoRequestSubject
.flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
Run Code Online (Sandbox Code Playgroud)
我还尝试在操作队列的帮助下限制并发性。但maxConcurrentOperationCount好像没有什么效果。
Publishers.Merge(
addImageRequestSubject
.receive(on: imageCompressionQueue)
.flatMap { self.compressImage($0) }
.compactMap { $0 }
.receive(on: mediaAddingQueue)
.flatMap { self.addImage($0) },
addVideoRequestSubject
.receive(on: mediaAddingQueue)
.flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
private lazy var imageCompressionQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
private lazy var mediaAddingQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
Run Code Online (Sandbox Code Playgroud)
平面地图出版商看起来是这样的:
func compressImage(_ image: UIImage) -> Future<Data?, Never> {
Future { promise in
DispatchQueue.global().async {
let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
promise(Result.success(result))
}
}
}
Run Code Online (Sandbox Code Playgroud)
您已经非常漂亮地进入了操作员的用例.buffer。.flatMap其目的是通过累积否则会下降的值来补偿背压。
我将通过一个完全人为的例子来说明:
class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)
var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}
Run Code Online (Sandbox Code Playgroud)
因此,我们的发布者每秒发出一个递增的整数,但我们的发布者flatMap需要.max(3)3 秒才能重新发布一个值。结果是我们开始错过价值观:
1
2
3
5
6
7
9
10
11
...
Run Code Online (Sandbox Code Playgroud)
解决办法就是在前面放一个缓冲区flatMap。它需要足够大,以将任何丢失的值保存足够长的时间,以便请求它们:
sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in
Run Code Online (Sandbox Code Playgroud)
结果是所有数值实际上都到达了sink. 当然,在现实生活中,如果缓冲区不够大,无法补偿发布者的价值排放率与反压的价值排放率之间的差异,我们仍然可能会丢失价值flatMap。
| 归档时间: |
|
| 查看次数: |
1501 次 |
| 最近记录: |