在后台线程中执行组合未来不起作用

iOS*_*eek 5 future grand-central-dispatch ios combine

如果你在 Playground 上运行它:

import Combine
import Foundation

struct User {
    let name: String
}

var didAlreadyImportUsers = false

var importUsers: Future<Bool, Never> {
    Future { promise in
        sleep(5)
        promise(.success(true))
    }
}

var fetchUsers: Future<[User], Error> {
    Future { promise in
        promise(.success([User(name: "John"), User(name: "Jack")]))
    }
}

var users: AnyPublisher<[User], Error> {
    if didAlreadyImportUsers {
        return fetchUsers
            .receive(on: DispatchQueue.global(qos: .userInitiated))
            .eraseToAnyPublisher()
    } else {
        return importUsers
            .receive(on: DispatchQueue.global(qos: .userInitiated))
            .setFailureType(to: Error.self)
            .combineLatest(fetchUsers)
            .map { $0.1 }
        .eraseToAnyPublisher()

    }
}

users
    .receive(on: DispatchQueue.global(qos: .userInitiated))
    .sink(receiveCompletion: { completion in
    print(completion)
}, receiveValue: { value in
    print(value)
})

print("run")

Run Code Online (Sandbox Code Playgroud)

输出将是:

[User(name: "John"), User(name: "Jack")]
run
finished
Run Code Online (Sandbox Code Playgroud)

但我期待得到:

run
[User(name: "John"), User(name: "Jack")]
finished
Run Code Online (Sandbox Code Playgroud)

因为接收器应该在后台线程中运行代码。我在这里缺少什么。我需要冲洗代码吗:

 sleep(5)
 promise(.success(true))
Run Code Online (Sandbox Code Playgroud)

在后台线程中?那么目的是什么

.receive(on: DispatchQueue.global(qos: .userInitiated))
Run Code Online (Sandbox Code Playgroud)

don*_*als 9

您的 Future 在创建后立即运行,因此在您的情况下,一旦访问此属性:

var importUsers: Future<Bool, Never> {
  Future { promise in
    sleep(5)
    promise(.success(true))

  }
}
Run Code Online (Sandbox Code Playgroud)

由于Future立即运行,这意味着传递给 Promise 的闭包会立即执行,使主线程在继续之前休眠 5 秒。在您的情况下,Future一旦您访问它,就会创建它users,这是在主线程上完成的。

receive(on:影响接收值的线程sink(或下游发布者),而不是创建值的位置。由于在您调用 时期货已经完成.sink,因此完成值和发出值将立即交付sink。在后台队列中,但仍然立即。

之后,你终于上print("run")线了。

如果用sleep(5)以下内容替换该位:

var importUsers: Future<Bool, Never> {
  Future { promise in
    DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
      promise(.success(true))
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

并对您的订阅代码进行一些小调整:

import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

var cancellables = Set<AnyCancellable>()

users
  .receive(on: DispatchQueue.global(qos: .userInitiated))
  .sink(receiveCompletion: { completion in
    print(completion)
  }, receiveValue: { value in
    print(value)
  }).store(in: &cancellables)
Run Code Online (Sandbox Code Playgroud)

您将看到输出按预期打印,因为初始 future 不会阻塞主线程五秒钟。

或者,如果您保持睡眠并像这样订阅,您将看到相同的输出:

import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

var cancellables = Set<AnyCancellable>()

users
  .subscribe(on: DispatchQueue.global(qos: .userInitiated))
  .sink(receiveCompletion: { completion in
    print(completion)
  }, receiveValue: { value in
    print(value)
  }).store(in: &cancellables)
Run Code Online (Sandbox Code Playgroud)

原因是您subscribe在后台线程上,因此订阅和所有内容都是在主线程之外异步设置的,这会导致在收到结果print("run")之前运行。Future然而,一旦users访问该属性(位于主线程上),主线程仍然会休眠 5 秒,因为那是您初始化Future. 因此,整个输出会立即打印出来,而不是在 5 秒后休眠"run"


Leo*_*nyk 6

有一种便捷的方法可以达到预期的效果。合并有一个延迟发布者,它会等待subscribe(on:)接收者。所以代码应该是这样的

var fetchUsers: Future<[User], Error> {
    return Deferred {
        Future { promise in
            sleep(5)
            promise(.success([User(name: "John"), User(name: "Jack")]))
        }
    }.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()

fetchUsers
    .subscribe(on: DispatchQueue.global(qos: .userInitiated))
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { value in
        print(value)
    }).store(in: &cancellables)

print("run")
Run Code Online (Sandbox Code Playgroud)

这样的代码不会停止主队列,并且输出将如预期的那样

run
[User(name: "John"), User(name: "Jack")]
finished
Run Code Online (Sandbox Code Playgroud)