我是RxJava的新手,因此,问这个问题.我有一个输入流,我必须转换为特定大小的字节数组序列.就像是:
Observable
.just(inputStream)
.map(new Func1<InputStream, Chunk>());
Run Code Online (Sandbox Code Playgroud)
这Chunk是一个自定义类,包含从流中读取的字节数.有人可以帮我理解如何在RxJava中执行此操作
您可以使用 Observable.create,然后使用 flatMap。请注意,默认情况下 QueuedProducer 是无界的,您可以提供包含有界队列的自定义实现。
例如:
static class Chunk {
byte[] buf;
int size;
int index;
public Chunk(byte[] buf, int size, int index) {
this.buf = buf;
this.size = size;
this.index = index;
}
}
FileInputStream fis = ...
Observable<Chunk> o = Observable.just(fis).flatMap(new Func1<InputStream, Observable<Chunk>>() {
@Override
public Observable<Chunk> call(InputStream is) {
return Observable.create(new Observable.OnSubscribe<Chunk>() {
public void call(Subscriber<? super Chunk> subscriber) {
final QueuedProducer<Chunk> producer = new QueuedProducer<>(subscriber);
subscriber.setProducer(producer);
try {
int size = 0;
int index = 0;
do {
byte[] buf = new byte[4096];
size = is.read(buf);
if (size > 0) {
Chunk chunk = new Chunk(buf, size, index++);
System.out.println("Producing chunk #" + index + " of size: " + chunk.size);
producer.onNext(chunk);
}
} while (size >= 0);
producer.onCompleted();
} catch (IOException e) {
producer.onError(e);
} finally {
try {
System.out.println("Closing stream");
is.close();
} catch (IOException e) {
}
}
}
})
.subscribeOn(Schedulers.io());
}
});
o.subscribe(new Action1<Chunk>() {
@Override
public void call(Chunk chunk) {
System.out.println("Received chunk #" + chunk.index + " of size: " + chunk.size);
}
});
Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4214 次 |
| 最近记录: |