RxSwift,我如何链接不同的可观察量

Pun*_*nty 10 ios swift alamofire rx-swift

我仍然是Reactive编程的初学者,以及RxSwift.我想连锁两个不同的操作.在我的情况下,我只想从Web服务器下载一个zip文件,然后在本地解压缩.我也希望,同时显示下载文件的进度.所以我开始创建第一个observable:

 class func rx_download(req:URLRequestConvertible, testId:String) -> Observable<Float> {

    let destination:Request.DownloadFileDestination = ...

    let obs:Observable<Float> = Observable.create { observer in
       let request =  Alamofire.download(req, destination: destination)
        request.progress { _, totalBytesWritten, totalBytesExpectedToWrite in
            if totalBytesExpectedToWrite > 0 {
                observer.onNext(Float(totalBytesWritten) / Float(totalBytesExpectedToWrite))
            }
            else {
                observer.onNext(0)
            }
        }
        request.response {  _, response, _, error in
            if let responseURL = response {
                if responseURL.statusCode == 200 {
                    observer.onNext(1.0)
                    observer.onCompleted()
                } else  {
                    let error = NSError(domain: "error", code: responseURL.statusCode, userInfo: nil)
                    observer.onError(error)
                }
            } else {
                let error = NSError(domain: "error", code: 500, userInfo: nil)
                observer.onError(error)
            }

            }
            return AnonymousDisposable () {
                request.cancel()
            }
        }
    return obs.retry(3)

}
Run Code Online (Sandbox Code Playgroud)

之后,我为解压缩创建了一个类似的功能

class func rx_unzip(testId:String) -> Observable<Float> {
    return Observable.create { observer in
        do {
            try Zip.unzipFile(NSURL.archivePath(testId), destination: NSURL.resourceDirectory(testId), overwrite: true, password: nil)
                {progress in
                    observer.onNext(Float(progress))
                }
        } catch let error {
            observer.onError(error)
        }
        observer.onCompleted()
        return NopDisposable.instance
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我在"查看模型层"上有这个逻辑,所以我下载 - >订阅完成 - >解压缩

我想要的是将两个Observable合二为一,以便先执行下载,然后再完成解压缩文件.有没有办法做到这一点?

iwi*_*not 5

Concat 运算符需要相同的数据类型

确实,concat运算符允许您强制执行可观察序列,但是使用时可能会遇到的一个问题concat是,concat运算符要求Observables具有相同的泛型类型。

let numbers     : Observable<Int>    = Observable.from([1,2,3])
let moreNumbers : Observable<Int>    = Observable.from([4,5,6])
let names       : Observable<String> = Observable.from(["Jose Rizal", "Leonor Rivera"])


// This works
numbers.concat(moreNumbers)

// Compile error
numbers.concat(names)
Run Code Online (Sandbox Code Playgroud)

FlatMap运算符允许您链接Observables 的序列

这是一个例子。

class Tag {
    var tag: String = ""
    init (tag: String) {
        self.tag = tag
    }
}

let getRequestReadHTML : Observable<String> = Observable
                            .just("<HTML><BODY>Hello world</BODY></HTML>")

func getTagsFromHtml(htmlBody: String) -> Observable<Tag> {
    return Observable.create { obx in

        // do parsing on htmlBody as necessary

        obx.onNext(Tag(tag: "<HTML>"))
        obx.onNext(Tag(tag: "<BODY>"))
        obx.onNext(Tag(tag: "</BODY>"))
        obx.onNext(Tag(tag: "</HTML>"))

        obx.onCompleted()

        return Disposables.create()
    }
}

getRequestReadHTML
    .flatMap{ getTagsFromHtml(htmlBody: $0) }
    .subscribe (onNext: { e in
        print(e.tag)
    })
Run Code Online (Sandbox Code Playgroud)

注意函数是getRequestReadHTMLtype Observable<String>时如何getTagsFromHtml类型Observable<Tag>

使用多个flatMap可以增加发射频率

但是要小心,因为flatMap运算符采用数组(例如[1,2,3])或序列(例如Observable),并将所有元素作为发射发出。这就是为什么已知会产生的转换的原因1...n

如果您定义了一个可观察的对象(例如网络呼叫),并且确定只有一个发射,那么您将不会遇到任何问题,因为它的转换是1...1(对于一个NSData来说是一个可观察的)。大!

但是,如果您的Observable有多个排放量,请格外小心,因为连锁flatMap经营者将意味着排放量将成倍增加(?)。

一个具体的例子是,当第一个可观察到的发射出3个发射时,flatMap运算符转换1...nn = 2的位置,这意味着现在总共发射了6个发射。另一个flatMap运算符可以再次转换1...nn = 2的位置,这意味着现在总共有12个发射。仔细检查这是否是您的预期行为。


Mic*_*uba 2

您可以使用concat运算符来链接这两个 Observables。生成的 Observable 将从next第一个 Observable 发送值,并在完成后从第二个 Observable 发送值。

有一个警告:您将获得范围从 0.0 到 1.0 的进度值rx_download,然后进度rx_unzip又将从 0.0 开始。如果您想在单个进度视图上显示进度,这可能会让用户感到困惑。

一种可能的方法是显示一个描述正在发生的事情的标签以及进度视图。您可以将map每个 Observable 转换为包含进度值和描述文本的元组,然后使用concat. 它可以看起来像这样:

let mappedDownload = rx_download.map {
    return ("Downloading", $0)
}

let mappedUnzip = rx_download.map {
    return ("Unzipping", $0)
}

mapped1.concat(mapped2)
 .subscribeNext({ (description, progress) in
    //set progress and show description
})
Run Code Online (Sandbox Code Playgroud)

当然,有很多可能的解决方案,但这更多的是一个设计问题,而不是编码问题。