Apple Combine 框架:如何并行执行多个发布者并等待所有发布者完成?

kam*_*pro 5 swift combine

我正在发现结合。我编写了以“组合”方式发出 HTTP 请求的方法,例如:

func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
    var request = URLRequest(url: url,
                             cachePolicy: .useProtocolCachePolicy,
                             timeoutInterval: 15)
    request.httpMethod = "GET"

    return urlSession.dataTaskPublisher(for: request)
        .tryMap {
            return $0.data
        }
        .eraseToAnyPublisher()
}
Run Code Online (Sandbox Code Playgroud)

我想多次调用该方法并完成任务,例如:

let myURLs: [URL] = ...

for url in myURLs {
    let cancellable = testRawDataTaskPublisher(for: url)
        .sink(receiveCompletion: { _ in }) { data in
            // save the data...
        }
}
Run Code Online (Sandbox Code Playgroud)

上面的代码不起作用,因为我必须将可取消项存储在属于该类的变量中。第一个问题是:将许多(例如 1000 个)可取消项存储在类似Set<AnyCancellable>??? 中是个好主意吗?不会造成内存泄漏吗?

var cancellables = Set<AnyCancellable>()

...

    let cancellable = ...

    cancellables.insert(cancellable) // ???
Run Code Online (Sandbox Code Playgroud)

第二个问题是:当所有的可取消项都完成后,如何开始一项任务?我在想类似的事情

class Test {
    var cancellables = Set<AnyCancellable>()

    func run() {
        // show a loader

        let cancellable = runDownloads()
            .receive(on: RunLoop.main)
            .sink(receiveCompletion: { _ in }) { _ in
                // hide the loader
            }

        cancellables.insert(cancellable)
    }

    func runDownloads() -> AnyPublisher<Bool, Error> {
        let myURLs: [URL] = ...

        return Future<Bool, Error> { promise in
            let numberOfURLs = myURLS.count
            var numberOfFinishedTasks = 0

            for url in myURLs {
                let cancellable = testRawDataTaskPublisher(for: url)
                    .sink(receiveCompletion: { _ in }) { data in
                        // save the data...
                        numberOfFinishedTasks += 1

                        if numberOfFinishedTasks >= numberOfURLs {
                            promise(.success(true))
                        }
                    }

                cancellables.insert(cancellable)
            }
        }.eraseToAnyPublisher()
    }

    func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

通常我会使用DispatchGroup, 启动多个 HTTP 任务并在任务完成时使用通知,但我想知道如何使用 Combine 以现代方式编写它。

Gil*_*man 15

您可以通过创建发布flatMap者集合、应用运算符并collect等待所有发布者完成后再继续来并行运行某些操作。这是您可以在操场上运行的示例:

import Combine
import Foundation

func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
  let p = PassthroughSubject<Value, Never>()
  DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    p.send(value)
    p.send(completion: .finished)
  }
  return p.eraseToAnyPublisher()
}

let myPublishers = [1,2,3]
  .map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }

let cancel = myPublishers
  .publisher
  .flatMap { $0 }
  .collect()
  .sink { result in
    print("result:", result)
  }
Run Code Online (Sandbox Code Playgroud)

这是输出:

1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]
Run Code Online (Sandbox Code Playgroud)

请注意,所有发布者都立即启动(按其原始顺序)。

1 / $0延迟导致第一个发行采取最长的完成。请注意末尾的值的顺序。由于第一个完成的时间最长,因此它是最后一个项目。