组合框架:如何在继续之前异步处理数组的每个元素

mat*_*att 16 ios swift combine

我在使用 iOS Combine 框架时遇到了一些心理障碍。

我正在将一些代码从“手动”从远程 API 获取转换为使用组合。基本上,API 是 SQL 和 REST(实际上是 Salesforce,但这与问题无关)。代码用来做的是调用一个接受完成处理程序的 REST 查询方法。我正在做的是用结合未来到处替换它。到现在为止还挺好。

当以下场景发生时,问题就出现了(并且经常发生):

  1. 我们执行 REST 查询并返回一组“对象”。

  2. 但是这些“对象”并没有完全填充。它们中的每一个都需要来自某个相关对象的附加数据。因此,对于每个“对象”,我们使用来自该“对象”的信息进行另一个 REST 查询,从而为我们提供另一个“对象”数组。

  3. 这可能允许也可能不允许我们完成第一个“对象”的填充——否则,我们可能必须使用来自第二个“对象”中的每个“对象”的信息进行另一个REST 查询,依此类推。

结果是很多这样结构的代码(这是伪代码):

func fetchObjects(completion: @escaping ([Object] -> Void) {
    let restQuery = ...
    RESTClient.performQuery(restQuery) { results in
        let partialObjects = results.map { ... }
        let group = DispatchGroup()
        for partialObject in partialObjects {
            let restQuery = ... // something based on partialObject
            group.enter()
            RESTClient.performQuery(restQuery) { results in
                group.leave()
                let partialObjects2 = results.map { ... }
                partialObject.property1 = // something from partialObjects2
                partialObject.property2 = // something from partialObjects2
                // and we could go down yet _another_ level in some cases
            }
        }
        group.notify {
            completion([partialObjects])
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

每次我results in在伪代码中说,这是异步网络调用的完成处理程序。

好吧,我很清楚如何在 Combine 中链接异步调用,例如通过使用 Futures 和flatMap(再次使用伪代码):

let future1 = Future...
future1.map {
    // do something
}.flatMap {
    let future2 = Future...
    return future2.map {
        // do something
    }
}
// ...
Run Code Online (Sandbox Code Playgroud)

在那个代码中,我们形成的方式future2可以依赖于我们从 的执行中接收到的值future1,并且在map上面future2我们可以修改从上游接收到的内容,然后再将其传递到管道中。没问题。这一切都非常漂亮。

但这并没有给我我在组合前代码中所做的事情,即loop。在这里,我在循环中执行多个异步调用,在继续之前由 DispatchGroup 固定。问题是:

这样做的组合模式是什么?

记住情况。我有一些对象的数组。我想遍历该数组,对循环中的每个对象进行异步调用,异步获取新信息并在此基础上修改该对象,然后再继续执行管道。每个循环可能涉及进一步的嵌套循环,以异步方式收集更多信息:

Fetch info from online database, it's an array
   |
   V
For each element in the array, fetch _more_ info, _that's_ an array
   |
   V
For each element in _that_ array, fetch _more_ info
   |
   V
Loop thru the accumulated info and populate that element of the original array 
Run Code Online (Sandbox Code Playgroud)

这样做的旧代码是可怕的前瞻性,充分利用DispatchGroup就位嵌套完成处理和循环enter/ leave/ notify但它奏效了。我无法让我的组合代码以同样的方式工作。我该怎么做?基本上我的管道输出是一个数组,我觉得我需要将该数组拆分为单个元素,对每个元素异步执行某些操作,然后将这些元素重新组合成一个数组。如何?


我一直在解决这个问题的方法有效,但不能扩展,特别是当异步调用需要到达管道链中几步后的信息时。我一直在做这样的事情(我从/sf/answers/4109586701/得到这个想法):

  1. 一组对象从上游到达。

  2. 我将 aflatMapmap数组输入到一个发布者数组中,每个发布者都由一个 Future 领导,该 Future 获取与一个对象相关的更多在线内容,然后是一个生成修改后对象的管道。

  3. 现在我有一系列管道,每个管道产生一个对象。我使用merge该数组并从flatMap.

  4. collect将结果值放回到数组中。

但这似乎仍然需要大量工作,更糟糕的是,当每个子管道本身需要生成一系列子管道时,它无法扩展。这一切都变得难以理解,过去很容易到达完成块的信息(因为 Swift 的范围规则)不再到达主管道中的后续步骤(或者很难到达,因为我在管道中传递越来越大的元组)。

必须有一些简单的组合模式才能做到这一点,但我完全错过了它。请告诉我它是什么。

New*_*Dev 9

使用您的最新编辑和以下评论:

我实际上是在问是否有一个相当于“在这一步(涉及多个异步步骤)完成之前不要进行下一步”的组合

我认为这种模式可以通过.flatMap一个数组发布者 (Publishers.Sequence)来实现,它一个一个地发出并完成,然后是任何需要的每元素异步处理,并用 a 完成.collect,它等待所有元素在继续之前完成

所以,在代码中,假设我们有这些函数:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>
Run Code Online (Sandbox Code Playgroud)

我们可以执行以下操作:

getFoos()
.flatMap { fooArr in 
    fooArr.publisher.setFailureType(to: Error.self)
 }

// per-foo element async processing
.flatMap { foo in

  getPartials(for: foo)
    .flatMap { partialArr in
       partialArr.publisher.setFailureType(to: Error.self)
     }

     // per-partial of foo async processing
    .flatMap { partial in

       getMoreInfo(for: partial, of: foo)
         // build completed partial with more info
         .map { moreInfo in
            var newPartial = partial
            newPartial.moreInfo = moreInfo
            return newPartial
         }
     }
     .collect()
     // build completed foo with all partials
     .map { partialArr in
        var newFoo = foo
        newFoo.partials = partialArr
        return newFoo
     }
}
.collect()
Run Code Online (Sandbox Code Playgroud)

(删除旧答案)


mat*_*att 6

使用公认的答案,我最终得到了这个结构:

head // [Entity]
    .flatMap { entities -> AnyPublisher<Entity, Error> in
        Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
    }.flatMap { entity -> AnyPublisher<Entity, Error> in
        self.makeFuture(for: entity) // [Derivative]
            .flatMap { derivatives -> AnyPublisher<Derivative, Error> in
                Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
            }
            .flatMap { derivative -> AnyPublisher<Derivative2, Error> in
                self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
        }.collect().map { derivative2s -> Entity in
            self.configuredEntity(entity, from: derivative2s)
        }.eraseToAnyPublisher()
    }.collect()
Run Code Online (Sandbox Code Playgroud)

这正是我所寻找的优雅的紧密度!所以想法是:

我们收到一个数组,我们需要异步处理每个元素。旧的方法是一个 DispatchGroup 和一个for...in循环。组合等效为:

  • 该行的等效项for...inflatMapPublishers.Sequence。

  • DispatchGroup(处理异步性)的等价物是一个进一步的flatMap(在单个元素上)和一些发布者。就我而言,我从基于我们刚刚收到的单个元素的未来开始。

  • 相当于末尾的右大括号collect(),等待处理所有元素并将数组重新组合在一起。

总结一下,模式是:

  1. flatMap数组到序列。
  2. flatMap将单个元素发送给发布者,该发布者在该元素上启动异步操作。
  3. 根据需要继续该发布者的链。
  4. collect回到数组中。

通过嵌套该模式,我们可以利用 Swift 作用域规则将需要处理的内容保留在作用域内,直到我们获得足够的信息来生成处理后的对象。