带有改造 2 的流媒体服务器 - 分块

Yam*_*ila 4 android chunked retrofit2

我应该从返回带有 Transfer-Encoding: chunked 的 json 的服务器流式传输一个端点。

我有以下代码,但我无法读取响应。我尝试了 responseBody.streamBytes() 并将输入流转换为 String 但我无法在主线程中执行此操作。我怎么能阅读回复?

@Streaming
@GET("stream/status")
Observable<ResponseBody> streamStatus();

Observable<ResponseBody> observable = ApiClientHelper.getClient().streamStatus();
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(final ResponseBody responseBody) {
                   //DON'T KNOW HOW TO READ DATA
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
Run Code Online (Sandbox Code Playgroud)

编辑:

使用 CURL 的服务器响应:

@Streaming
@GET("stream/status")
Observable<ResponseBody> streamStatus();

Observable<ResponseBody> observable = ApiClientHelper.getClient().streamStatus();
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(final ResponseBody responseBody) {
                   //DON'T KNOW HOW TO READ DATA
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
Run Code Online (Sandbox Code Playgroud)

Luk*_*asz 5

添加@Streaming注释会导致改造不会将整个文件移动到内存中,而是立即传递传入的字节。这使您能够处理可能大于总可用内存的数据流。但是,如果您尝试在主线程上执行此操作,您将得到一个android.os.NetworkOnMainThreadException,这就是我假设您得到的。所以问题出在.observeOn(AndroidSchedulers.mainThread).

编辑:公平警告。我没有运行这个。

    observable
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable disposable) {

                }

                @Override
                public void onNext(ResponseBody responseBody) {
                    InputStream inputStream = responseBody.byteStream();
                    BufferedReader br = null;
                    StringBuilder sb = new StringBuilder();

                    String line;
                    try {

                        br = new BufferedReader(new InputStreamReader(inputStream));
                        while (br.ready()) {
                            line = br.readLine();
                            sb.append(line);
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    Log.d("streamed string", sb.toString()); // replace log with whatever you want to do with it.
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onComplete() {

                }
            });
Run Code Online (Sandbox Code Playgroud)