如何仅在收到第一条消息后应用组合运算符?

ken*_*nyc 7 macos ios swift combine

Combine仅使用内置运算符的情况下,有没有办法跳过第一个值上的运算符,然后将该运算符应用于所有后续值?

考虑以下:

publisher
  .debounce(...)
  .sink(...)
Run Code Online (Sandbox Code Playgroud)

在这种安排中,debounce将等待指定的超时时间过去,然后再将值传递给sink。然而,很多时候您只想debounce在第一个元素之后启动。例如,如果用户尝试过滤联系人列表,他们很可能只在文本字段中输入一个字母。如果是这种情况,应用程序可能应该立即开始过滤,而不必等待debounce超时。

我知道Drop发布者,但我似乎找不到它们的组合来执行更多的“跳过”操作,以便接收sink每个值,但debounce在第一个值上被忽略。

像下面这样:

publisher
  .if_first_element_passthrough_to_sink(...), else_debounce(...)
  .sink(...)
Run Code Online (Sandbox Code Playgroud)

内置运算符可以实现类似的功能吗?

澄清

由于我最初的帖子没有应有的那么清晰,所以需要进行一些澄清...下面由 Asperi 提供的答案非常接近,但理想情况下,序列中的第一个元素始终被传递,然后会debounce启动。

假设用户正在输入以下内容:

ABC ...(暂停打字几秒钟)...D ...(暂停)...EFG

我想要的是:

  • ADE立即交付。
  • B C合并为仅C使用debounce
  • F G合并为仅G使用debounce

Asp*_*eri 7

如果我正确理解了您的需求,则可以基于Concatenate如下内容(伪代码)来实现:

let originalPublisher = ...
let publisher = Publishers.Concatenate(
        prefix: originalPublisher.first(),
        suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
    .eraseToAnyPublisher()
Run Code Online (Sandbox Code Playgroud)

因此,前缀只是从原始发布者向下游发送第一个元素并完成,之后后缀只是使用 传递所有后续元素debounce


rob*_*off 4

在您的特定情况下debounce,您可能更喜欢 的行为throttle。它立即发送第一个元素,然后每次发送不超过一个元素interval

\n\n

不管怎样,你能用合并内置函数来做到这一点吗?是的,有一些困难。应该?也许\xe2\x80\xa6

\n\n

这是您目标的大理石图:

\n\n

修改后的去抖算子的弹珠图

\n\n

每次一个值进入 kennyc-debouncer 时,它都会启动一个计时器(由阴影区域表示)。如果计时器运行时有值到达,kennyc-debouncer 会保存该值并重新启动计时器。当计时器到期时,如果在计时器运行期间有任何值到达,kennyc-debouncer 会立即发出最新值。

\n\n

scan运算符允许我们保持每次输入到达时我们都会发生变化的状态。我们需要将两种输入发送到scan:来自上游发布者的输出和计时器触发。因此,让我们为这些输入定义一个类型:

\n\n
fileprivate enum DebounceEvent<Value> {\n    case value(Value)\n    case timerFired\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我们的内心需要什么样的状态scan?我们肯定需要调度程序、间隔和调度程序选项,以便我们可以设置计时器。

\n\n

我们还需要一个PassthroughSubject可以用来将计时器触发转换为输入的scan操作员的输入。

\n\n

我们实际上无法取消并重新启动计时器,因此,当计时器触发时,我们将查看它是否应该重新启动。如果是这样,我们将启动另一个计时器。因此,我们需要知道计时器是否正在运行,计时器触发时要发送什么输出,以及计时器的重新启动时间(如果需要重新启动)。

\n\n

自从scan\ 的输出是整个状态值,因此我们还需要状态包含要发送到下游的输出值(如果有)。

\n\n

这是状态类型:

\n\n
fileprivate struct DebounceState<Value, S: Scheduler> {\n    let scheduler: S\n    let interval: S.SchedulerTimeType.Stride\n    let options: S.SchedulerOptions?\n\n    let subject = PassthroughSubject<Void, Never>()\n\n    enum TimerState {\n        case notRunning\n        case running(PendingOutput?)\n\n        struct PendingOutput {\n            var value: Value\n            var earliestDeliveryTime: S.SchedulerTimeType\n        }\n    }\n\n    var output: Value? = nil\n    var timerState: TimerState = .notRunning\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在让我们看看如何实际使用scan与其他一些运算符一起使用来实现 kennyc 版本的 debounce:

\n\n
extension Publisher {\n    func kennycDebounce<S: Scheduler>(\n        for dueTime: S.SchedulerTimeType.Stride,\n        scheduler: S,\n        options: S.SchedulerOptions? = nil\n    ) -> AnyPublisher<Output, Failure>\n    {\n        let initialState = DebounceState<Output, S>(\n            scheduler: scheduler,\n            interval: dueTime,\n            options: options)\n        let timerEvents = initialState.subject\n            .map { _ in DebounceEvent<Output>.timerFired }\n            .setFailureType(to: Failure.self)\n        return self\n            .map { DebounceEvent.value($0) }\n            .merge(with: timerEvents)\n            .scan(initialState) { $0.updated(with: $1) }\n            .compactMap { $0.output }\n            .eraseToAnyPublisher()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我们首先构建初始状态scan运算符的初始状态。

\n\n

然后,我们创建一个发布者,将Void状态的输出转换PassthroughSubject.timerFired事件。

\n\n

最后,我们构建了完整的管道,它有四个阶段:

\n\n
    \n
  1. 将上游输出(从self)变为.value事件。

  2. \n
  3. 将值事件与计时器事件合并。

  4. \n
  5. 用于scan通过值和计时器事件更新去抖状态。实际工作是通过updated(with:)我们将添加到的方法完成的DebounceState下面添加的方法完成的。

  6. \n
  7. 将完整状态映射到我们想要传递给下游的值,并丢弃空值(当上游事件被反跳抑制时会发生这种情况)。

  8. \n
\n\n

剩下的就是编写updated(with:)方法了。它查看每个传入事件的类型(valuetimerFired)和计时器的状态,以决定新状态应该是什么,并在必要时设置新计时器。

\n\n
extension DebounceState {\n    func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {\n        var answer = self\n        switch (event, timerState) {\n        case (.value(let value), .notRunning):\n            answer.output = value\n            answer.timerState = .running(nil)\n            scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }\n        case (.value(let value), .running(_)):\n            answer.output = nil\n            answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))\n        case (.timerFired, .running(nil)):\n            answer.output = nil\n            answer.timerState = .notRunning\n        case (.timerFired, .running(.some(let pendingOutput))):\n            let now = scheduler.now\n            if pendingOutput.earliestDeliveryTime <= now {\n                answer.output = pendingOutput.value\n                answer.timerState = .notRunning\n            } else {\n                answer.output = nil\n                scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }\n            }\n        case (.timerFired, .notRunning):\n            // Impossible!\n            answer.output = nil\n        }\n        return answer\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

有效吗?让我们测试一下:

\n\n
import PlaygroundSupport\nPlaygroundPage.current.needsIndefiniteExecution = true\n\nlet subject = PassthroughSubject<String, Never>()\nlet q = DispatchQueue.main\nlet start = DispatchTime.now()\nlet cfStart = CFAbsoluteTimeGetCurrent()\nq.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }\n// A should be delivered at start + 100ms.\nq.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }\nq.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }\n// C should be delivered at start + 800ms.\nq.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }\n// D should be delivered at start + 1100ms.\nq.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }\n// E should be delivered at start + 1800ms.\nq.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }\nq.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }\n// G should be delivered at start + 2500ms.\n\nlet ticket = subject\n    .kennycDebounce(for: .milliseconds(500), scheduler: q)\n    .sink {\n        print("\\($0) \\(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }\n
Run Code Online (Sandbox Code Playgroud)\n\n

输出:

\n\n
A 107.0\nC 847.0\nD 1167.0\nE 1915.0\nG 2714.0\n
Run Code Online (Sandbox Code Playgroud)\n\n

我不知道为什么后来的事件如此延迟。这可能只是游乐场的副作用。

\n