使用 RxJava2 下载带有进程的文件

CoX*_*ier 5 rx-java rx-java2

正如标题所示,我想使用 RxJava2 和 okhttp 下载文件。而且还需要过程。

在我看来,数据流是String url-> Response-> 乘法Float process

所以首先我需要一个Single发出Response.

Single.create((SingleOnSubscribe<Response>) emitter -> {
    emitter.onSuccess(xx);// or throw error
});
Run Code Online (Sandbox Code Playgroud)

接下来我需要Flowable对吗?因为我想向观察者发送多个进程信息。

那么这里我该怎么办呢?提前致谢。

编辑

我尝试转换SingleFlowable.

Single.create((SingleOnSubscribe<Response>) emitter -> {
    emitter.onSuccess(xxx);// or throw error
}).toFlowable()
        .flatMap((Function<Response, Publisher<Float>>) response -> Flowable.create(xxx))
        .subscribe(process -> {});

}
Run Code Online (Sandbox Code Playgroud)

但我不知道这样做是否合适。

编辑

我不关心 okhttp 细节,我只关心与RxJava诸如Single和相关的内容Flowable

小智 1

这是如何使用 Retrofit 2、OkHttp、Okio 和 RxJava 执行此操作的示例。

import com.squareup.okhttp.ResponseBody;
import okio.BufferedSink;
import okio.Okio;
import retrofit.Response;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;

interface ApiService {
    @GET("/media/download")
    Observable<Response<ResponseBody>> download(@Query("id") String id);
}

public Observable<File> download(String id) {
    return service.download(id)
        .flatMap(this::saveFile);
}

public Observable<File> saveFile(Response<ResponseBody> response) {
    return Observable.create((Observable.OnSubscribe<File>) subscriber -> {
        try {
            // you can access headers of response
            String header = response.headers().get("Content-Disposition");
            // this is specific case, it's up to you how you want to save your file
            // if you are not downloading file from direct link, you might be 
            // lucky to obtain file name from header
            String fileName = header.replace("attachment; filename=", "");
            // will create file in global Music directory, can be any other 
            // directory, just don't forget to handle permissions

            File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_MUSIC)
                    .getAbsoluteFile(), fileName);

            // No space before "(Environment.DIRECTORY_MUSIC)"
            BufferedSink sink = Okio.buffer(Okio.sink(file));
            // you can access body of response
            sink.writeAll(response.body().source());
            sink.close();
            subscriber.onNext(file);
            subscriber.onCompleted();
        } catch (IOException e) {
            e.printStackTrace();
            subscriber.onError(e);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)