Android rxjava2 Flowable with Compositedisposable

Al *_*one 1 android observable rx-java

我对 Flowables 有问题并将它们添加到复合材料中。我想从 Observable 切换到 Flowable,因为该操作可能会发出 1000 个或更多的值。我对 rxjava2 有点缺乏经验,所以如果这个问题很愚蠢,请原谅我:)

到目前为止,我使用了这样的 observable:

 public Observable<String> uploadPictureRx(String path)
    {
        return Observable.create(new ObservableOnSubscribe<String>()
        {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception
            {
                Uri file = Uri.fromFile(new File(path));
                String segment = file.getLastPathSegment();
                UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);


                uploadTask.addOnFailureListener(new OnFailureListener()
                {
                    @Override
                    public void onFailure(@NonNull Exception exception)
                    {
                        e.onError(exception);
                    }
                }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
                    {                           
                        //noinspection VisibleForTests
                        downloadUrl = taskSnapshot.getDownloadUrl();
                        String url = downloadUrl.getPath();
                        e.onNext(url);
                        e.onComplete();
                    }
                }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
                    {
                        //noinspection VisibleForTests
                        long bytes = taskSnapshot.getBytesTransferred();
                        String bytesS = String.valueOf(bytes);
                        e.onNext(bytesS);
                    }
                });
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

并像这样调用方法:

private void uploadPicToFireBaseStorage(String path)
    {
        compositeDisposable.add(storageService.uploadPictureRx(path)
                .subscribeOn(Schedulers.io())
                .observeOn(mainScheduler)
                .subscribeWith(new DisposableObserver<String>()
                {

                    @Override
                    public void onNext(String s)
                    {
                        String ss = s;
                        System.out.println(ss);
                    }

                    @Override
                    public void onError(Throwable e)
                    {
                        e.printStackTrace();
                    }

                    @Override
                    public void onComplete()
                    {
                        view.displayToast("Picture Upload completed");
                    }
                })
        );
    }
Run Code Online (Sandbox Code Playgroud)

这工作正常!但是,当我尝试用 Flowable 而不是 observable 做同样的事情时,它不会编译:

public Flowable<String> uploadPictureRx(String path)
    {
        return Flowable.create(new FlowableOnSubscribe<String>()
        {

            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception
            {
                Uri file = Uri.fromFile(new File(path));
                String segment = file.getLastPathSegment();
                UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);    

                uploadTask.addOnFailureListener(new OnFailureListener()
                {
                    @Override
                    public void onFailure(@NonNull Exception exception)
                    {
                        e.onError(exception);
                    }
                }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
                    {                           
                        //noinspection VisibleForTests
                        downloadUrl = taskSnapshot.getDownloadUrl();
                        String url = downloadUrl.getPath();
                        e.onNext(url);
                        e.onComplete();
                    }
                }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
                    {
                        //noinspection VisibleForTests
                        long bytes = taskSnapshot.getBytesTransferred();
                        String bytesS = String.valueOf(bytes);
                        e.onNext(bytesS);
                    }
                });
            }
        }, BackpressureStrategy.BUFFER);

    }
Run Code Online (Sandbox Code Playgroud)

错误是:类型参数“E”的推断类型“E”不在其范围内;应该实现 'org.reactivestreams.Subscriber

我的猜测是,Flowable 没有实现 Disposable,这就是它无法编译的原因。我不知道那是不是真的,只是我目前最好的猜测。或者我必须将 subscribeWith() 更改为 subscribe() 吗?我不知道这种变化会产生什么影响。

无论如何,建议如何进行这项工作并将这个 Flowable 放入我的复合材料中,真的很感激。

谢谢你们!

编辑:

尝试将 DisposableObserver 更改为订阅服务器。但这会导致以下错误: 编译器错误

Pho*_*ang 5

由于背压的原因,Flowables 使用 Subscription 而不是 Disposable。基本上使用 Subscription.request() 方法来告诉 observable 我在那一刻想要多少个项目。

更改您的代码:

private void uploadPicToFireBaseStorage(String path)
{
    compositeDisposable.add(storageService.uploadPictureRx(path)
            .subscribeOn(Schedulers.io())
            .observeOn(mainScheduler)
            .subscribeWith(new DisposableObserver<String>()
            {

                @Override
                public void onNext(String s)
                {
                    String ss = s;
                    System.out.println(ss);
                }

                @Override
                public void onError(Throwable e)
                {
                    e.printStackTrace();
                }

                @Override
                public void onComplete()
                {
                    view.displayToast("Picture Upload completed");
                }
            })
    );
}
Run Code Online (Sandbox Code Playgroud)

进入

private void uploadPicToFireBaseStorage(String path)
{
    compositeDisposable.add(storageService.uploadPictureRx(path)
            .subscribeOn(Schedulers.io())
            .observeOn(mainScheduler)
            .subscribeWith(new ResourceSubscriber<String>()
            {
                @Override
                public void onNext(String s)
                {
                    String ss = s;
                    System.out.println(ss);
                }

                @Override
                public void onError(Throwable e)
                {
                    e.printStackTrace();
                }

                @Override
                public void onComplete()
                {
                    view.displayToast("Picture Upload completed");
                }
            })
    );
}
Run Code Online (Sandbox Code Playgroud)


San*_*Sur 5

Flowable根据作品的发布-订阅模式,即发行商-用户模式

然而 ,

Observable根据观察者模式工作

pub-sub 模式中有一个中间事件通道,它保存由 发布的数据publisher,然后event channel 发出数据并subscriber获取数据onNext(...)

而在观察者模式中,observable直接发出数据或将数据直接抛出到observer. 这可能会产生背压。(因为它一次性发出全部数据。)

所以用 ( Flowable)

 .subscribeWith(new ResourceSubscriber<>) // in case of flowable
Run Code Online (Sandbox Code Playgroud)

或者,

 .subscribeWith(new DisposableSubscriber<>) 
Run Code Online (Sandbox Code Playgroud)

而在 ( observable) 的情况下使用

  .subscribeWith(new ResourceObserver<>)  
Run Code Online (Sandbox Code Playgroud)