mat*_*att 27 ios swift combine
如何让构成Combine框架的异步管道同步(串行)排队?
假设我有 50 个 URL,我想从中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation/OperationQueue 来做到这一点,例如使用一个 Operation 子类,该子类在下载完成之前不会声明自己已完成。我将如何使用Combine 做同样的事情?
目前我想到的只是保留一个剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在sink管道中重复。这似乎不太像结合。
我确实尝试制作一组 URL 并将其映射到一组发布者。我知道我可以“生产”一个发布者,并使用flatMap. 但后来我仍然在同时进行所有下载。没有任何组合方式以受控方式遍历阵列——或者有吗?
(我也想象过用 Future 做点什么,但我变得绝望了。我不习惯这种思维方式。)
我只是对此进行了简要测试,但乍一看似乎每个请求在开始之前都等待前一个请求完成。
我发布此解决方案以寻求反馈。如果这不是一个好的解决方案,请批评。
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }
// We know at this point that it's safe to grab the first publisher.
let first = self.first!
// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }
// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()
// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}
return output
}
}
Run Code Online (Sandbox Code Playgroud)
此解决方案的更简洁版本(由@matt 提供):
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
$0.append($1).eraseToAnyPublisher()
}
}
}
Run Code Online (Sandbox Code Playgroud)
您可以在接收返回 Subscribers.Demand.max(1) 的地方创建自定义订阅者。在这种情况下,订阅者只有在收到一个值时才会请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模拟了网络流量:-)
import PlaygroundSupport
import SwiftUI
import Combine
class MySubscriber: Subscriber {
typealias Input = String
typealias Failure = Never
func receive(subscription: Subscription) {
print("Received subscription", Thread.current.isMainThread)
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
print("Received input: \(input)", Thread.current.isMainThread)
return .max(1)
}
func receive(completion: Subscribers.Completion<Never>) {
DispatchQueue.main.async {
print("Received completion: \(completion)", Thread.current.isMainThread)
PlaygroundPage.current.finishExecution()
}
}
}
(110...120)
.publisher.receive(on: DispatchQueue.global())
.map {
print(Thread.current.isMainThread, Thread.current)
usleep(UInt32.random(in: 10000 ... 1000000))
return String(format: "%02x", $0)
}
.subscribe(on: DispatchQueue.main)
.subscribe(MySubscriber())
print("Hello")
PlaygroundPage.current.needsIndefiniteExecution = true
Run Code Online (Sandbox Code Playgroud)
操场印花...
Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true
Run Code Online (Sandbox Code Playgroud)
UPDATE
终于我发现.flatMap(maxPublishers: ),这迫使我更新有点不同的方法,这有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是“随机”或“幸运”行为:-)
import PlaygroundSupport
import Combine
import Foundation
PlaygroundPage.current.needsIndefiniteExecution = true
let A = (1 ... 9)
.publisher
.flatMap(maxPublishers: .max(1)) { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
}
}
.sink { value in
print(value, "A")
}
let B = (1 ... 9)
.publisher
.flatMap { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
}
}
.sink { value in
print(" ",value, "B")
}
Run Code Online (Sandbox Code Playgroud)
印刷
1 A
4 B
5 B
7 B
1 B
2 B
8 B
6 B
2 A
3 B
9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A
Run Code Online (Sandbox Code Playgroud)
基于写在这里
。连载()?
由 Clay Ellis 定义接受的答案可以替换为
.publisher.flatMap(maxPublishers: .max(1)){$0}
而“非序列化”版本必须使用
.publisher.flatMap{$0}
“现实世界的例子”
import PlaygroundSupport
import Foundation
import Combine
let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
var components = URLComponents()
components.scheme = "https"
components.path = path
components.queryItems = [URLQueryItem(name: parameter, value: nil)]
return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
var args: [String: String]
}
let collection = urls.compactMap { value in
URLSession.shared.dataTaskPublisher(for: value)
.tryMap { data, response -> Data in
return data
}
.decode(type: Postman.self, decoder: JSONDecoder())
.catch {_ in
Just(Postman(args: [:]))
}
}
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return $0.append($1).eraseToAnyPublisher()
}
}
}
var streamA = ""
let A = collection
.publisher.flatMap{$0}
.sink(receiveCompletion: { (c) in
print(streamA, " ", c, " .publisher.flatMap{$0}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamA)
})
var streamC = ""
let C = collection
.serialize()?
.sink(receiveCompletion: { (c) in
print(streamC, " ", c, " .serialize()?")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamC)
})
var streamD = ""
let D = collection
.publisher.flatMap(maxPublishers: .max(1)){$0}
.sink(receiveCompletion: { (c) in
print(streamD, " ", c, " .publisher.flatMap(maxPublishers: .max(1)){$0}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamD)
})
PlaygroundPage.current.needsIndefiniteExecution = true
Run Code Online (Sandbox Code Playgroud)
印刷
.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-) finished .publisher.flatMap{$0}
... which proves the downloads are happening serially .-) finished .publisher.flatMap(maxPublishers: .max(1)){$0}
... which proves the downloads are happening serially .-) finished .serialize()?
Run Code Online (Sandbox Code Playgroud)
在我看来,在其他场景中也非常有用。尝试在下一个片段中使用 maxPublishers 的默认值并比较结果:-)
import Combine
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
//.publish
.flatMap(maxPublishers: .max(1), { (pair) in
Just(pair)
})
.print()
.sink { letters, digits in
print(letters, digits)
}
"Hello World!".map(String.init).forEach { (s) in
subject.send(s)
}
subject.send(completion: .finished)
Run Code Online (Sandbox Code Playgroud)
flatMap(maxPublishers:transform:)与 一起使用.max(1),例如
func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
.flatMap(maxPublishers: .max(1)) { $0 }
.eraseToAnyPublisher()
}
Run Code Online (Sandbox Code Playgroud)
在哪里
func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
URLSession.shared.dataTaskPublisher(for: url)
.compactMap { UIImage(data: $0.data) }
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
}
Run Code Online (Sandbox Code Playgroud)
和
var imageRequests: AnyCancellable?
func fetchImages() {
imageRequests = imagesPublisher(for: urls).sink { completion in
switch completion {
case .finished:
print("done")
case .failure(let error):
print("failed", error)
}
} receiveValue: { image in
// do whatever you want with the images as they come in
}
}
Run Code Online (Sandbox Code Playgroud)
这导致:
但是我们应该认识到,像这样按顺序执行它们会带来很大的性能损失。例如,如果我一次将它提高到 6,它的速度是原来的两倍多:
就个人而言,我建议仅在绝对必要时按顺序下载(下载一系列图像/文件时,几乎肯定不是这种情况)。是的,并发执行请求可能会导致它们不会按特定顺序完成,但我们只是使用与顺序无关的结构(例如,字典而不是简单的数组),但性能提升非常显着,因此通常值得。
但是,如果您希望它们按顺序下载,则maxPublishers参数可以实现。
从原来的问题:
我确实尝试制作一组 URL 并将其映射到一组发布者。我知道我可以“生产”一个发布者,并使用
flatMap. 但后来我仍然在同时进行所有下载。没有任何组合方式以受控方式遍历阵列——或者有吗?
这是一个玩具示例,可以代表真正的问题:
let collection = (1 ... 10).map {
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap() {$0}
.sink {print($0)}.store(in:&self.storage)
Run Code Online (Sandbox Code Playgroud)
这会以随机顺序发出从 1 到 10 的整数,并在随机时间到达。目标是做一些事情,collection使其按顺序发出从 1 到 10 的整数。
现在我们只改变一件事:在行中
.flatMap {$0}
Run Code Online (Sandbox Code Playgroud)
我们添加maxPublishers参数:
let collection = (1 ... 10).map {
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap(maxPublishers:.max(1)) {$0}
.sink {print($0)}.store(in:&self.storage)
Run Code Online (Sandbox Code Playgroud)
Presto,我们现在确实按顺序发出从 1 到 10 的整数,它们之间具有随机间隔。
让我们将其应用于原始问题。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的.flatMap:
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
.map {URL(string:$0)!}
.map {session.dataTaskPublisher(for: $0)
.eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
.handleEvents(receiveOutput: {_ in print("start")})
.flatMap() {$0}
.map {$0.data}
.sink(receiveCompletion: {comp in
switch comp {
case .failure(let err): print("error", err)
case .finished: print("finished")
}
}, receiveValue: {_ in print("done")})
.store(in:&self.storage)
Run Code Online (Sandbox Code Playgroud)
结果是
start
start
start
done
done
done
finished
Run Code Online (Sandbox Code Playgroud)
这表明我们正在同时进行三个下载。好的,现在改变
.flatMap() {$0}
Run Code Online (Sandbox Code Playgroud)
到
.flatMap(maxPublishers:.max(1) {$0}
Run Code Online (Sandbox Code Playgroud)
现在的结果是:
start
done
start
done
start
done
finished
Run Code Online (Sandbox Code Playgroud)
所以我们现在是串口下载,这就是原来要解决的问题。
为了与 TIMTOWTDI 的原则保持一致,我们可以改为将发布者链接起来append以对其进行序列化:
let collection = (1 ... 10).map {
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
return $0.append($1).eraseToAnyPublisher()
}
Run Code Online (Sandbox Code Playgroud)
结果是一个发布者序列化原始集合中的延迟发布者。让我们通过订阅来证明它:
pub.sink {print($0)}.store(in:&self.storage)
Run Code Online (Sandbox Code Playgroud)
果然,整数现在按顺序到达(之间有随机间隔)。
pub正如 Clay Ellis 所建议的那样,我们可以通过对 Collection 的扩展来封装来自一组发布者的创建:
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return $0.append($1).eraseToAnyPublisher()
}
}
}
Run Code Online (Sandbox Code Playgroud)