延迟后结合框架重试?

mat*_*att 11 ios swift combine

我看怎么.retry直接使用,出错后重新订阅,像这样:

    URLSession.shared.dataTaskPublisher(for:url)
        .retry(3)
Run Code Online (Sandbox Code Playgroud)

但这似乎非常简单。如果我认为等待一段时间后此错误可能会消失怎么办?我可以插入一个.delay运算符,但是即使没有错误,延迟也会起作用。并且似乎没有一种方法可以有条件地应用运算符(即仅在出现错误时)。

我看到了如何通过从头开始编写 RetryWithDelay 操作符来解决这个问题,而且确实这样的操作符是由第三方编写的。但是有没有办法说“如果有错误就延迟”,纯粹使用我们给定的运算符?

我的想法是我可以使用.catch,因为它的功能仅在出现错误时运行。但是该函数需要返回一个发布者,我们将使用哪个发布者?如果我们返回somePublisher.delay(...)后跟.retry,我们就会.retry向错误的出版商申请,不是吗?

hec*_*ckj 10

这是不久前使用Combine项目回购的一个话题——整个线程:https : //github.com/heckj/swiftui-notes/issues/164

长短是我们做了一个例子,我认为它可以满足您的需求,尽管它确实使用了catch

let resultPublisher = upstreamPublisher.catch { error -> AnyPublisher<String, Error> in
    return Publishers.Delay(upstream: upstreamPublisher,
                            interval: 3,
                            tolerance: 1,
                            scheduler: DispatchQueue.global())
    // moving retry into this block reduces the number of duplicate requests
    // In effect, there's the original request, and the `retry(2)` here will operate
    // two additional retries on the otherwise one-shot publisher that is initiated with
    // the `Publishers.Delay()` just above. Just starting this publisher with delay makes
    // an additional request, so the total number of requests ends up being 4 (assuming all
    // fail). However, no delay is introduced in this sequence if the original request
    // is successful.
    .retry(2)
    .eraseToAnyPublisher()
}
Run Code Online (Sandbox Code Playgroud)

这是引用了我在 book/online 中的重试模式,这基本上就是你所描述的(但不是你问的)。

我在该问题上与之通信在该线程中提供了一个变体作为可能也很有趣的扩展:

extension Publisher {
  func retryWithDelay<T, E>()
    -> Publishers.Catch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure
  {
    return self.catch { error -> AnyPublisher<T, E> in
      return Publishers.Delay(
        upstream: self,
        interval: 3,
        tolerance: 1,
        scheduler: DispatchQueue.global()).retry(2).eraseToAnyPublisher()
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 这是不正确的,因为延迟延迟了事件,而不是订阅。因此,第一次重试将在第一次失败后立即执行。重试的结果会延迟3秒,这不是你想要的。 (5认同)

Sea*_*sen 6

我在接受的答案中发现了一些关于实现的怪癖。

  • 首先,前两次尝试将立即触发,因为第一次延迟仅在第二次尝试后生效。

  • 其次,如果任何一次重试尝试成功,输出值也会延迟,这似乎是不必要的。

  • 第三,扩展不够灵活,无法让用户决定将重试尝试分派到哪个调度程序。

经过一番折腾,我最终得到了这样的解决方案:

public extension Publisher {
    /**
     Creates a new publisher which will upon failure retry the upstream publisher a provided number of times, with the provided delay between retry attempts.
     If the upstream publisher succeeds the first time this is bypassed and proceeds as normal.

     - Parameters:
        - retries: The number of times to retry the upstream publisher.
        - delay: Delay in seconds between retry attempts.
        - scheduler: The scheduler to dispatch the delayed events.

     - Returns: A new publisher which will retry the upstream publisher with a delay upon failure.

     ~~~
     let url = URL(string: "https://api.myService.com")!

     URLSession.shared.dataTaskPublisher(for: url)
         .retryWithDelay(retries: 4, delay: 5, scheduler: DispatchQueue.global())
         .sink { completion in
             switch completion {
             case .finished:
                 print("Success ")
             case .failure(let error):
                 print("The last and final failure after retry attempts: \(error)")
             }
         } receiveValue: { output in
             print("Received value: \(output)")
         }
         .store(in: &cancellables)
     ~~~
     */
    func retryWithDelay<S>(
        retries: Int,
        delay: S.SchedulerTimeType.Stride,
        scheduler: S
    ) -> AnyPublisher<Output, Failure> where S: Scheduler {
        self
            .delayIfFailure(for: delay, scheduler: scheduler)
            .retry(retries)
            .eraseToAnyPublisher()
    }

    private func delayIfFailure<S>(
        for delay: S.SchedulerTimeType.Stride,
        scheduler: S
    ) -> AnyPublisher<Output, Failure> where S: Scheduler {
        self.catch { error in
            Future { completion in
                scheduler.schedule(after: scheduler.now.advanced(by: delay)) {
                    completion(.failure(error))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 感谢您考虑这个问题! (2认同)

The*_*eoK 6

我记得RxSwiftExt库有一个非常好的自定义重试 + 延迟运算符的实现,具有许多选项(线性和指数延迟,加上提供自定义闭包的选项),我尝试在合并中重新创建它。最初的实现在这里

\n
/**\n Provides the retry behavior that will be used - the number of retries and the delay between two subsequent retries.\n - `.immediate`: It will immediatelly retry for the specified retry count\n - `.delayed`: It will retry for the specified retry count, adding a fixed delay between each retry\n - `.exponentialDelayed`: It will retry for the specified retry count.\n The delay will be incremented by the provided multiplier after each iteration\n (`multiplier = 0.5` corresponds to 50% increase in time between each retry)\n - `.custom`: It will retry for the specified retry count. The delay will be calculated by the provided custom closure.\n The closure\'s argument is the current retry\n */\nenum RetryBehavior<S> where S: Scheduler {\n  case immediate(retries: UInt)\n  case delayed(retries: UInt, time: TimeInterval)\n  case exponentialDelayed(retries: UInt, initial: TimeInterval, multiplier: Double)\n  case custom(retries: UInt, delayCalculator: (UInt) -> TimeInterval)\n}\n\nfileprivate extension RetryBehavior {\n  \n  func calculateConditions(_ currentRetry: UInt) -> (maxRetries: UInt, delay: S.SchedulerTimeType.Stride) {\n    \n    switch self {\n    case let .immediate(retries):\n      // If immediate, returns 0.0 for delay\n      return (maxRetries: retries, delay: .zero)\n    case let .delayed(retries, time):\n      // Returns the fixed delay specified by the user\n      return (maxRetries: retries, delay: .seconds(time))\n    case let .exponentialDelayed(retries, initial, multiplier):\n      // If it is the first retry the initial delay is used, otherwise it is calculated\n      let delay = currentRetry == 1 ? initial : initial * pow(1 + multiplier, Double(currentRetry - 1))\n      return (maxRetries: retries, delay: .seconds(delay))\n    case let .custom(retries, delayCalculator):\n      // Calculates the delay with the custom calculator\n      return (maxRetries: retries, delay: .seconds(delayCalculator(currentRetry)))\n    }\n    \n  }\n  \n}\n\npublic typealias RetryPredicate = (Error) -> Bool\n\nextension Publisher {\n  /**\n   Retries the failed upstream publisher using the given retry behavior.\n   - parameter behavior: The retry behavior that will be used in case of an error.\n   - parameter shouldRetry: An optional custom closure which uses the downstream error to determine\n   if the publisher should retry.\n   - parameter tolerance: The allowed tolerance in firing delayed events.\n   - parameter scheduler: The scheduler that will be used for delaying the retry.\n   - parameter options: Options relevant to the scheduler\xe2\x80\x99s behavior.\n   - returns: A publisher that attempts to recreate its subscription to a failed upstream publisher.\n   */\n  func retry<S>(\n    _ behavior: RetryBehavior<S>,\n    shouldRetry: RetryPredicate? = nil,\n    tolerance: S.SchedulerTimeType.Stride? = nil,\n    scheduler: S,\n    options: S.SchedulerOptions? = nil\n  ) -> AnyPublisher<Output, Failure> where S: Scheduler {\n    return retry(\n      1,\n      behavior: behavior,\n      shouldRetry: shouldRetry,\n      tolerance: tolerance,\n      scheduler: scheduler,\n      options: options\n    )\n  }\n  \n  private func retry<S>(\n    _ currentAttempt: UInt,\n    behavior: RetryBehavior<S>,\n    shouldRetry: RetryPredicate? = nil,\n    tolerance: S.SchedulerTimeType.Stride? = nil,\n    scheduler: S,\n    options: S.SchedulerOptions? = nil\n  ) -> AnyPublisher<Output, Failure> where S: Scheduler {\n    \n    // This shouldn\'t happen, in case it does we finish immediately\n    guard currentAttempt > 0 else { return Empty<Output, Failure>().eraseToAnyPublisher() }\n    \n    // Calculate the retry conditions\n    let conditions = behavior.calculateConditions(currentAttempt)\n    \n    return self.catch { error -> AnyPublisher<Output, Failure> in\n      \n      // If we exceed the maximum retries we return the error\n      guard currentAttempt <= conditions.maxRetries else {\n        return Fail(error: error).eraseToAnyPublisher()\n      }\n      \n      if let shouldRetry = shouldRetry, shouldRetry(error) == false {\n        // If the shouldRetry predicate returns false we also return the error\n        return Fail(error: error).eraseToAnyPublisher()\n      }\n      \n      guard conditions.delay != .zero else {\n        // If there is no delay, we retry immediately\n        return self.retry(\n          currentAttempt + 1,\n          behavior: behavior,\n          shouldRetry: shouldRetry,\n          tolerance: tolerance,\n          scheduler: scheduler,\n          options: options\n        )\n        .eraseToAnyPublisher()\n      }\n      \n      // We retry after the specified delay\n      return Just(()).delay(for: conditions.delay, tolerance: tolerance, scheduler: scheduler, options: options).flatMap {\n        return self.retry(\n          currentAttempt + 1,\n          behavior: behavior,\n          shouldRetry: shouldRetry,\n          tolerance: tolerance,\n          scheduler: scheduler,\n          options: options\n        )\n        .eraseToAnyPublisher()\n      }\n      .eraseToAnyPublisher()\n    }\n    .eraseToAnyPublisher()\n  }\n  \n}\n
Run Code Online (Sandbox Code Playgroud)\n


mat*_*att 5

使用.catch确实是答案。我们只需引用数据任务发布者,并将该引用用作两个管道 \xe2\x80\x94 的头部(执行初始网络的外部管道和由函数生成的内部管道).catch

\n\n

让我们首先创建数据任务发布者并停止

\n\n
let pub = URLSession.shared.dataTaskPublisher(for: url).share()\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在我可以形成管道的头部:

\n\n
let head = pub.catch {_ in pub.delay(for: 3, scheduler: DispatchQueue.main)}\n    .retry(3)\n
Run Code Online (Sandbox Code Playgroud)\n\n

应该这样做!head现在是一个管道,仅在出现错误时插入延迟运算符。然后我们可以基于 继续形成管道的其余部分head

\n\n

观察我们确实更换了出版商;如果发生故障并且catch函数运行,则pub上游的 成为.delay发布者,取代pub我们开始时的 。然而,它们是同一个对象(因为我说的是share),所以这是有区别的,没有区别。

\n