Swift Combine:缓冲上游值并以稳定的速度发出它们?

Ube*_*son 12 ios swift combine

在 iOS 13 中使用新的 Combine 框架。

假设我有一个上游发布者以非常不规则的速度发送值 - 有时几秒或几分钟可能没有任何值,然后一个值流可能会同时通过。我想创建一个自定义发布者,该发布者订阅上游值,缓冲它们并在它们进入时以已知的常规节奏发出它们,但如果它们都已耗尽则不发布任何内容。

举个具体的例子:

  • t = 0 到 5000 毫秒:未发布上游值
  • t = 5001ms:上游发布“a”
  • t = 5002ms:上游发布“b”
  • t = 5003ms:上游发布“c”
  • t = 5004ms 到 10000ms:没有发布上游值
  • t = 10001ms:上游发布“d”

我订阅上游的发布者将每 1 秒产生一次值:

  • t = 0 到 5000 毫秒:未发布值
  • t = 5001ms:发布“a”
  • t = 6001ms:发布“b”
  • t = 7001ms:发布“c”
  • t = 7001ms 到 10001ms:没有发布值
  • t = 10001ms:发布“d”

合并中现有的出版商或运营商似乎都没有完全按照我的意愿行事。

  • throttle并且debounce会简单地以某个节奏采样上游值并删除丢失的值(例如,如果节奏为 1000 毫秒,则仅发布“a”)
  • delay 将为每个值添加相同的延迟,但不会将它们隔开(例如,如果我的延迟是 1000 毫秒,它将在 6001 毫秒时发布“a”,在 6002 毫秒时发布“b”,在 6003 毫秒时发布“c”)
  • buffer看起来很有希望,但我不太清楚如何使用它 - 如何强制它按需从缓冲区发布值。当我将一个接收器连接到buffer它时,它似乎立即发布了所有值,根本没有缓冲。

我考虑过使用某种组合运算符,如zipormergecombineLatest,并将其与 Timer 发布者相结合,这可能是正确的方法,但我无法确切地弄清楚如何配置它以提供我想要的行为。

编辑

这是一个大理石图,希望能说明我要做什么:

Upstream Publisher:
-A-B-C-------------------D-E-F--------|>

My Custom Operator:
-A----B----C-------------D----E----F--|>
Run Code Online (Sandbox Code Playgroud)

编辑 2:单元测试

这是一个单元测试,如果modulatedPublisher(我想要的缓冲发布者)按预期工作,它应该通过。它并不完美,但它会在接收到事件(包括接收到的时间)时存储它们,然后比较事件之间的时间间隔,确保它们不小于所需的间隔。

func testCustomPublisher() {
    let expectation = XCTestExpectation(description: "async")
    var events = [Event]()

    let passthroughSubject = PassthroughSubject<Int, Never>()
    let cancellable = passthroughSubject
        .modulatedPublisher(interval: 1.0)
        .sink { value in
            events.append(Event(value: value, date: Date()))
            print("value received: \(value) at \(self.dateFormatter.string(from:Date()))")
        }

    // WHEN I send 3 events, wait 6 seconds, and send 3 more events
    passthroughSubject.send(1)
    passthroughSubject.send(2)
    passthroughSubject.send(3)

    DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(6000)) {
        passthroughSubject.send(4)
        passthroughSubject.send(5)
        passthroughSubject.send(6)

        DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(4000)) {

            // THEN I expect the stored events to be no closer together in time than the interval of 1.0s
            for i in 1 ..< events.count {
                let interval = events[i].date.timeIntervalSince(events[i-1].date)
                print("Interval: \(interval)")

                // There's some small error in the interval but it should be about 1 second since I'm using a 1s modulated publisher.
                XCTAssertTrue(interval > 0.99)
            }
            expectation.fulfill()
        }
    }

    wait(for: [expectation], timeout: 15)
}
Run Code Online (Sandbox Code Playgroud)

我得到的最接近的是 using zip,如下所示:

public extension Publisher where Self.Failure == Never {
    func modulatedPublisher(interval: TimeInterval) -> AnyPublisher<Output, Never> {
        let timerBuffer = Timer
        .publish(every: interval, on: .main, in: .common)
        .autoconnect()

      return timerBuffer
        .zip(self, { $1 })                  // should emit one input element ($1) every timer tick
        .eraseToAnyPublisher()
    }
}
Run Code Online (Sandbox Code Playgroud)

这可以正确调整前三个事件(1、2 和 3),但不会调整后三个事件(4、5 和 6)。输出:

value received: 1 at 3:54:07.0007
value received: 2 at 3:54:08.0008
value received: 3 at 3:54:09.0009
value received: 4 at 3:54:12.0012
value received: 5 at 3:54:12.0012
value received: 6 at 3:54:12.0012
Run Code Online (Sandbox Code Playgroud)

我相信这是因为zip有一些内部缓冲能力。前三个上游事件在 Timer 的节奏上被缓冲和发射​​,但在 6 秒的等待期间,Timer 的事件被缓冲 - 当第二个上游事件被触发时,已经有 Timer 事件在队列中等待,所以它们配对并立即开火。

rob*_*off 20

这是一个有趣的问题。我玩过Timer.publish, buffer, zip, 和 的各种组合throttle,但我无法让任何组合以您想要的方式工作。所以让我们编写一个自定义订阅者。

我们真正想要的是一个 API,当我们从上游获得输入时,我们还能够控制上游何时提供下一个输入。像这样的东西:

extension Publisher {
    /// Subscribe to me with a stepping function.
    /// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
    ///   Each time I call this function with an input, I also give it a promise function.
    ///   I won't deliver the next input until the promise is called with a `.more` argument.
    /// - returns: An object you can use to cancel the subscription asynchronously.
    func step(with stepper: @escaping (StepEvent<Output, Failure>) -> ()) -> AnyCancellable {
        ???
    }
}

enum StepEvent<Input, Failure: Error> {
    /// Handle the Input. Call `StepPromise` when you're ready for the next Input,
    /// or to cancel the subscription.
    case input(Input, StepPromise)

    /// Upstream completed the subscription.
    case completion(Subscribers.Completion<Failure>)
}

/// The type of callback given to the stepper function to allow it to continue
/// or cancel the stream.
typealias StepPromise = (StepPromiseRequest) -> ()

enum StepPromiseRequest {
    // Pass this to the promise to request the next item from upstream.
    case more

    // Pass this to the promise to cancel the subscription.
    case cancel
}
Run Code Online (Sandbox Code Playgroud)

使用这个stepAPI,我们可以编写一个pace操作符来做你想做的事:

extension Publisher {
    func pace<Context: Scheduler, MySubject: Subject>(
        _ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
        -> AnyCancellable
        where MySubject.Output == Output, MySubject.Failure == Failure
    {
        return step {
            switch $0 {
            case .input(let input, let promise):
                // Send the input from upstream now.
                subject.send(input)

                // Wait for the pace interval to elapse before requesting the
                // next input from upstream.
                scheduler.schedule(after: scheduler.now.advanced(by: pace)) {
                    promise(.more)
                }

            case .completion(let completion):
                subject.send(completion: completion)
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这个pace操作符需要pace(输出之间所需的间隔),一个调度事件的调度程序,以及一个subject从上游重新发布输入的调度程序。它通过发送每个输入来处理每个输入subject,然后在从上游请求下一个输入之前使用调度程序等待步调间隔。

现在我们只需要实现step操作符。在这里,Combine 并没有给我们太多帮助。它确实有一个称为“背压”的功能,这意味着发布者不能向下游发送输入,直到下游通过发送Subscribers.Demand上游来请求它。通常你会看到下游向.unlimited上游发送需求,但我们不会这样做。相反,我们将利用背压。在步进器完成承诺之前,我们不会向上游发送任何需求,然后我们将只发送 的需求.max(1),因此我们使上游与步进器同步运行。(我们还必须发送一个初始需求.max(1)来启动整个过程。)

好的,所以需要实现一个带有步进函数并符合Subscriber. 查看Reactive Streams JVM Specification是个好主意,因为 Combine 基于该规范。

使实现变得困难的是,有几件事可以异步调用我们的订阅者:

  • 上游可以从任何线程调用订阅者(但需要序列化其调用)。
  • 在我们将 promise 函数提供给 stepper 之后,stepper 可以在任何线程上调用这些 promise。
  • 我们希望订阅是可取消的,并且取消可以发生在任何线程上。
  • 所有这些异步性意味着我们必须用锁来保护我们的内部状态。
  • 我们必须小心不要在持有该锁时调用,以避免死锁。

我们还将通过给每个承诺一个唯一的 id 来保护订阅者免受涉及重复调用承诺或调用过时承诺的恶作剧。

这是我们的基本订阅者定义:

import Combine
import Foundation

public class SteppingSubscriber<Input, Failure: Error> {

    public init(stepper: @escaping Stepper) {
        l_state = .subscribing(stepper)
    }

    public typealias Stepper = (Event) -> ()

    public enum Event {
        case input(Input, Promise)
        case completion(Completion)
    }

    public typealias Promise = (Request) -> ()

    public enum Request {
        case more
        case cancel
    }

    public typealias Completion = Subscribers.Completion<Failure>

    private let lock = NSLock()

    // The l_ prefix means it must only be accessed while holding the lock.
    private var l_state: State
    private var l_nextPromiseId: PromiseId = 1

    private typealias PromiseId = Int

    private var noPromiseId: PromiseId { 0 }
}
Run Code Online (Sandbox Code Playgroud)

请注意,我将辅助类型从之前的 ( StepEvent, StepPromise, 和StepPromiseRequest) 移到SteppingSubscriber并缩短了它们的名称。

现在让我们考虑l_state的神秘类型,State。我们的订户可能处于哪些不同的状态?

  • 我们可能正在等待Subscription从上游接收对象。
  • 我们本可以Subscription从上游接收并等待信号(来自上游的输入或完成,或来自步进器的承诺的完成)。
  • 我们可以调用步进器,我们要小心,以防它在我们调用它时完成了一个承诺。
  • 我们可能已被取消或已从上游收到完成。

所以这是我们的定义State

extension SteppingSubscriber {
    private enum State {
        // Completed or cancelled.
        case dead

        // Waiting for Subscription from upstream.
        case subscribing(Stepper)

        // Waiting for a signal from upstream or for the latest promise to be completed.
        case subscribed(Subscribed)

        // Calling out to the stopper.
        case stepping(Stepping)

        var subscription: Subscription? {
            switch self {
            case .dead: return nil
            case .subscribing(_): return nil
            case .subscribed(let subscribed): return subscribed.subscription
            case .stepping(let stepping): return stepping.subscribed.subscription
            }
        }

        struct Subscribed {
            var stepper: Stepper
            var subscription: Subscription
            var validPromiseId: PromiseId
        }

        struct Stepping {
            var subscribed: Subscribed

            // If the stepper completes the current promise synchronously with .more,
            // I set this to true.
            var shouldRequestMore: Bool
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

由于我们正在使用NSLock(为简单起见),让我们定义一个扩展以确保我们始终将锁定与解锁匹配:

fileprivate extension NSLock {
    @inline(__always)
    func sync<Answer>(_ body: () -> Answer) -> Answer {
        lock()
        defer { unlock() }
        return body()
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我们准备好处理一些事件。最容易处理的事件是异步取消,这是Cancellable协议的唯一要求。如果我们处于除 之外的任何状态.dead,我们希望成为.dead,如果有上游订阅,则取消它。

extension SteppingSubscriber: Cancellable {
    public func cancel() {
        let sub: Subscription? = lock.sync {
            defer { l_state = .dead }
            return l_state.subscription
        }
        sub?.cancel()
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,我不想cancellock锁定时调用上游订阅的函数,因为lock它不是递归锁,我不想冒险死锁。所有使用都lock.sync遵循将任何调用推迟到锁定解锁之后的模式。

现在让我们实现Subscriber协议要求。首先,让我们处理Subscription从上游接收的。这应该发生的唯一时间是当我们处于该.subscribing状态时,但.dead在这种情况下我们只想取消上游订阅也是可能的。

extension SteppingSubscriber: Subscriber {
    public func receive(subscription: Subscription) {
        let action: () -> () = lock.sync {
            guard case .subscribing(let stepper) = l_state else {
                return { subscription.cancel() }
            }
            l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
            return { subscription.request(.max(1)) }
        }
        action()
    }
Run Code Online (Sandbox Code Playgroud)

请注意,在此使用lock.sync(以及所有以后的使用)中,我返回了一个“动作”闭包,以便我可以在锁被解锁后执行任意调用。

Subscriber我们将解决的下一个协议要求是接收完成:

    public func receive(completion: Subscribers.Completion<Failure>) {
        let action: (() -> ())? = lock.sync {
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream already completed (and shouldn't call this again),
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
            //   me again concurrently.
            guard case .subscribed(let subscribed) = l_state else {
                return nil
            }
            l_state = .dead
            return { [stepper = subscribed.stepper] in
                stepper(.completion(completion))
            }
        }
        action?()
    }
Run Code Online (Sandbox Code Playgroud)

Subscriber对我们来说最复杂的协议要求是接收Input

  • 我们必须创造一个承诺。
  • 我们必须将承诺传递给步进器。
  • 步进器可以在返回之前完成承诺。
  • 步进器返回后,我们必须检查它是否完成了承诺.more,如果是,则返回适当的上游需求。

由于我们必须在这项工作的中间调用步进器,因此我们有一些丑陋的lock.sync调用嵌套。

    public func receive(_ input: Input) -> Subscribers.Demand {
        let action: (() -> Subscribers.Demand)? = lock.sync {
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream completed and shouldn't call this,
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to
            //   signal me again concurrently.
            guard case .subscribed(var subscribed) = l_state else {
                return nil
            }

            let promiseId = l_nextPromiseId
            l_nextPromiseId += 1
            let promise: Promise = { request in
                self.completePromise(id: promiseId, request: request)
            }
            subscribed.validPromiseId = promiseId
            l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
            return { [stepper = subscribed.stepper] in
                stepper(.input(input, promise))

                let demand: Subscribers.Demand = self.lock.sync {
                    // The only possible states now are .stepping and .dead.
                    guard case .stepping(let stepping) = self.l_state else {
                        return .none
                    }
                    self.l_state = .subscribed(stepping.subscribed)
                    return stepping.shouldRequestMore ? .max(1) : .none
                }

                return demand
            }
        }

        return action?() ?? .none
    }
} // end of extension SteppingSubscriber: Publisher
Run Code Online (Sandbox Code Playgroud)

我们的订阅者需要处理的最后一件事是承诺的完成。由于以下几个原因,这很复杂:

  • 我们希望防止承诺被多次完成。
  • 我们想防止旧的承诺被完成。
  • 当一个 promise 完成时,我们可以处于任何状态。

因此:

extension SteppingSubscriber {
    private func completePromise(id: PromiseId, request: Request) {
        let action: (() -> ())? = lock.sync {
            switch l_state {
            case .dead, .subscribing(_): return nil
            case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
                subscribed.validPromiseId = noPromiseId
                l_state = .subscribed(subscribed)
                return { [sub = subscribed.subscription] in
                    sub.request(.max(1))
                }
            case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return { [sub = subscribed.subscription] in
                    sub.cancel()
                }
            case .subscribed(_):
                // Multiple completion or stale promise.
                return nil
            case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
                stepping.subscribed.validPromiseId = noPromiseId
                stepping.shouldRequestMore = true
                l_state = .stepping(stepping)
                return nil
            case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return { [sub = stepping.subscribed.subscription] in
                    sub.cancel()
                }
            case .stepping(_):
                // Multiple completion or stale promise.
                return nil
            }
        }

        action?()
    }
}
Run Code Online (Sandbox Code Playgroud)

哇!

完成所有这些后,我们可以编写真正的step运算符:

extension Publisher {
    func step(with stepper: @escaping (SteppingSubscriber<Output, Failure>.Event) -> ()) -> AnyCancellable {
        let subscriber = SteppingSubscriber<Output, Failure>(stepper: stepper)
        self.subscribe(subscriber)
        return .init(subscriber)
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我们可以pace从上面尝试该运算符。由于我们不在 中进行任何缓冲SteppingSubscriber,并且上游通常没有缓冲,因此我们将buffer在上游和我们的pace操作员之间插入a 。

    var cans: [AnyCancellable] = []

    func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
        let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
            .merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .handleEvents(
                receiveOutput: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") },
                receiveCompletion: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") }
        )
            .makeConnectable()

        let subject = PassthroughSubject<String, Never>()

        cans += [erratic
            .buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
            .pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]

        cans += [subject.sink(
            receiveCompletion: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") },
            receiveValue: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") }
            )]

        let c = erratic.connect()
        cans += [AnyCancellable { c.cancel() }]

        return true
    }
Run Code Online (Sandbox Code Playgroud)

终于,这里是输出:

erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
Run Code Online (Sandbox Code Playgroud)
  • 时间戳以秒为单位。
  • 不稳定的出版商的时间确实是不稳定的,有时会及时接近。
  • 即使不稳定事件发生的时间间隔小于一秒,有节奏的计时也总是至少相隔一秒。
  • 当不稳定事件发生在前一事件后超过一秒时,在不稳定事件之后立即发送有节奏的事件,没有进一步的延迟。
  • 有节奏的完成发生在最后一个有节奏的事件后一秒,即使不稳定的完成发生在最后一个不稳定的事件之后。该buffer直至收到要求它发出的最后一个事件之后,和这种需求是由起搏计时器延迟不发送完成。

为了便于复制/粘贴,我已将step运算符的整个实现放在此要点中。


New*_*Dev 12

编辑

对于下面概述的原始方法,还有一种更简单的方法,它不需要起搏器,而是使用由flatMap(maxPublishers: .max(1)).

flatMap发送 1 的需求,直到它返回的发布者(我们可以延迟)完成。我们需要Buffer上游的发布者来缓冲这些值。

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let interval = 1.0

let pub = subject
   .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
   .flatMap(maxPublishers: .max(1)) {
      Just($0)
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
   }
Run Code Online (Sandbox Code Playgroud)

原来的

我知道这是一个老问题,但我认为有一种更简单的方法来实现这一点,所以我想我会分享。

这个想法类似于.zip带有 a 的 a Timer,除了不是 a Timer,您可以.zip使用来自先前发送的值的延时“滴答声”,这可以通过 a 实现CurrentValueSubjectCurrentValueSubject需要而不是 aPassthroughSubject来播种第一个“滴答声”。

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()

let pacer = CurrentValueSubject<Void, Never>(())
let interval = 1.0

let pub = subject.zip(pacer)
   .flatMap { v in
      Just(v.0) // extract the original value
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
        .handleEvents(receiveOutput: { _ in 
           pacer.send() // send the pacer "tick" after the interval
        }) 
   }
Run Code Online (Sandbox Code Playgroud)

发生的是.zip起搏器上的门,它仅在从先前发送的值延迟后到达。

如果下一个值早于允许的时间间隔,它将等待起搏器。但是,如果下一个值稍后出现,则起搏器已经有一个新值可以立即提供,因此不会有延迟。


如果您像在测试用例中一样使用它:

let c = pub.sink { print("\($0): \(Date())") }

subject.send(Date())
subject.send(Date())
subject.send(Date())

DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
   subject.send(Date())
   subject.send(Date())
}

DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
   subject.send(Date())
   subject.send(Date())
}
Run Code Online (Sandbox Code Playgroud)

结果将是这样的:

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let interval = 1.0

let pub = subject
   .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
   .flatMap(maxPublishers: .max(1)) {
      Just($0)
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
   }
Run Code Online (Sandbox Code Playgroud)