Joz*_*ard 5 java reactive-programming java-8 rx-java rx-java2
我对 RxJava 和响应式编程完全陌生。我有一个任务,我必须读取文件并将其存储到 Observable。我曾尝试在内部使用 BufferedReader 制作 Callable 并使用 Observable.fromCallable(),但效果不佳。
你能告诉我我该怎么做吗?
我正在使用 RxJava 2.0。
一个基本的解决方案,我使用嵌套类FileObservableSource
来生成数据,然后将 Observable 的创建推迟到 Observer 订阅:
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class StackOverflow {
public static void main(String[] args) {
final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
observable.subscribe(
line -> System.out.println("next line: " + line),
Throwable::printStackTrace,
() -> System.out.println("finished")
);
}
static class FileObservableSource implements ObservableSource<String> {
private final String filename;
FileObservableSource(String filename) {
this.filename = filename;
}
@Override
public void subscribe(Observer<? super String> observer) {
try {
Files.lines(Paths.get(filename)).forEach(observer::onNext);
observer.onComplete();
} catch (IOException e) {
observer.onError(e);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
您可以执行与此实现类似的操作。
然后该类在这里以这种方式使用:
public static Observable<byte[]> from(InputStream is, int size) {
return Observable.create(new OnSubscribeInputStream(is, size));
}
Run Code Online (Sandbox Code Playgroud)
最终你可以使用它:
Observable<byte[]> chunks = Bytes.from(file, chunkSize);
Run Code Online (Sandbox Code Playgroud)
更多详细信息请参见此处。
归档时间: |
|
查看次数: |
9689 次 |
最近记录: |