如何从RxJava中的输入流创建Observable字节序列

Nir*_*jan 6 java rx-java

我是RxJava的新手,因此,问这个问题.我有一个输入流,我必须转换为特定大小的字节数组序列.就像是:

Observable
  .just(inputStream)
  .map(new Func1<InputStream, Chunk>());
Run Code Online (Sandbox Code Playgroud)

Chunk是一个自定义类,包含从流中读取的字节数.有人可以帮我理解如何在RxJava中执行此操作

yur*_*gis 1

您可以使用 Observable.create,然后使用 flatMap。请注意,默认情况下 QueuedProducer 是无界的,您可以提供包含有界队列的自定义实现。

例如:

  static class Chunk {
    byte[] buf;
    int size;
    int index;
    public Chunk(byte[] buf, int size, int index) {
      this.buf = buf;
      this.size = size;
      this.index = index;
    }

  }

  FileInputStream fis = ...
  Observable<Chunk> o = Observable.just(fis).flatMap(new Func1<InputStream, Observable<Chunk>>() {

  @Override
  public Observable<Chunk> call(InputStream is) {
    return Observable.create(new Observable.OnSubscribe<Chunk>() {

      public void call(Subscriber<? super Chunk> subscriber) {
        final QueuedProducer<Chunk> producer = new QueuedProducer<>(subscriber);
        subscriber.setProducer(producer);
        try {
          int size = 0;
          int index = 0;
          do {
            byte[] buf = new byte[4096];
            size = is.read(buf);
            if (size > 0) {
              Chunk chunk = new Chunk(buf, size, index++);
              System.out.println("Producing chunk #" + index + " of size: " + chunk.size);
              producer.onNext(chunk);
            }
          } while (size >= 0);
          producer.onCompleted();
        } catch (IOException e) {
          producer.onError(e);
        } finally {
          try {
            System.out.println("Closing stream");
            is.close();
          } catch (IOException e) {
          }
        }
      }
    })
    .subscribeOn(Schedulers.io());
 }
});

o.subscribe(new Action1<Chunk>() {

  @Override
  public void call(Chunk chunk) {
    System.out.println("Received chunk #" + chunk.index + " of size: " + chunk.size);

  }

});

Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)