在 `publisher.receive(on: backgroundQueue)` 之后不会调用 `Sink.receiveValue`

JIE*_*ANG 1 ios swift combine

使用下面的代码,在我放置 后receive(on: backgroundQueue)receiveCompletion将被 100% 调用,但receiveValue块则不会。

xxxxPublisher
    .xxxx()
    .receive(on: backgroundQueue)
    .xxxx()
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        // completion code
    }, receiveValue: { value in
        // receive value code
    }).store(in: &cancellables)
Run Code Online (Sandbox Code Playgroud)

这似乎不是一个好行为,我们不应该receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil)这样使用吗?我错过了什么吗?

这是重现此错误的代码,如果运行此代码,您将看到它receiveCompletion被调用了 300 次,但receiveValue调用次数少于 300 次。

import Foundation
import Combine

private var cancellables: Set<AnyCancellable> = []
let backgroundQueue = DispatchQueue.global(qos: .background)
for i in 1...300 {
    runPublisher(i)
}
var sinkCompletedIndices = Set<Int>()
var sinkOutputIndices = Set<Int>()

func runPublisher(_ index: Int) {
    [1].publisher
    .receive(on: backgroundQueue)
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        NSLog("sink receiveCompletion")
        sinkCompletedIndices.insert(index)
    }, receiveValue: { value in
        NSLog("sink receiveValue")
        sinkOutputIndices.insert(index)
    }).store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
    let diff = sinkCompletedIndices.filter { !sinkOutputIndices.contains($0) }
    NSLog("Difference between completions and outputs \(diff)")
}
RunLoop.main.run()
Run Code Online (Sandbox Code Playgroud)

LuL*_*aGa 6

问题是Combine期望调度程序作为串行队列运行,但 DispatchQueue.global 是并发的。

如果您像这样声明后台队列:

let backgroundQueue = DispatchQueue(label: "any name will do", qos: .background)
Run Code Online (Sandbox Code Playgroud)

一切都会按预期进行。

Swift 论坛上有一个关于它的有趣讨论:

https://forums.swift.org/t/runloop-main-or-dispatchqueue-main-when-using-combine-scheduler/26635