RxJava。读取文件到可观察对象

Joz*_*ard 5 java reactive-programming java-8 rx-java rx-java2

我对 RxJava 和响应式编程完全陌生。我有一个任务,我必须读取文件并将其存储到 Observable。我曾尝试在内部使用 BufferedReader 制作 Callable 并使用 Observable.fromCallable(),但效果不佳。

你能告诉我我该怎么做吗?

我正在使用 RxJava 2.0。

P.J*_*sch 5

一个基本的解决方案,我使用嵌套类FileObservableSource来生成数据,然后将 Observable 的创建推迟到 Observer 订阅:

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class StackOverflow {

    public static void main(String[] args) {
        final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
        observable.subscribe(
                line -> System.out.println("next line: " + line),
                        Throwable::printStackTrace,
                        () -> System.out.println("finished")
                );
    }

    static class FileObservableSource implements ObservableSource<String> {

        private final String filename;

        FileObservableSource(String filename) {
            this.filename = filename;
        }

        @Override
        public void subscribe(Observer<? super String> observer) {
            try {
                Files.lines(Paths.get(filename)).forEach(observer::onNext);
                observer.onComplete();
            } catch (IOException e) {
                observer.onError(e);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


GVi*_*i82 2

您可以执行与此实现类似的操作。

然后该类在这里以这种方式使用:

public static Observable<byte[]> from(InputStream is, int size) {
     return Observable.create(new OnSubscribeInputStream(is, size));
}
Run Code Online (Sandbox Code Playgroud)

最终你可以使用它:

Observable<byte[]> chunks = Bytes.from(file, chunkSize);
Run Code Online (Sandbox Code Playgroud)

更多详细信息请参见此处