Al *_*one 1 android observable rx-java
我对 Flowables 有问题并将它们添加到复合材料中。我想从 Observable 切换到 Flowable,因为该操作可能会发出 1000 个或更多的值。我对 rxjava2 有点缺乏经验,所以如果这个问题很愚蠢,请原谅我:)
到目前为止,我使用了这样的 observable:
public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}
Run Code Online (Sandbox Code Playgroud)
并像这样调用方法:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
这工作正常!但是,当我尝试用 Flowable 而不是 observable 做同样的事情时,它不会编译:
public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);
}
Run Code Online (Sandbox Code Playgroud)
错误是:类型参数“E”的推断类型“E”不在其范围内;应该实现 'org.reactivestreams.Subscriber
我的猜测是,Flowable 没有实现 Disposable,这就是它无法编译的原因。我不知道那是不是真的,只是我目前最好的猜测。或者我必须将 subscribeWith() 更改为 subscribe() 吗?我不知道这种变化会产生什么影响。
无论如何,建议如何进行这项工作并将这个 Flowable 放入我的复合材料中,真的很感激。
谢谢你们!
编辑:
尝试将 DisposableObserver 更改为订阅服务器。但这会导致以下错误: 编译器错误
由于背压的原因,Flowables 使用 Subscription 而不是 Disposable。基本上使用 Subscription.request() 方法来告诉 observable 我在那一刻想要多少个项目。
更改您的代码:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
进入
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new ResourceSubscriber<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
Flowable根据作品的发布-订阅模式,即发行商-用户模式
然而 ,
Observable根据观察者模式工作
在pub-sub 模式中有一个中间事件通道,它保存由 发布的数据publisher,然后event channel 发出数据并subscriber获取数据onNext(...)。
而在观察者模式中,observable直接发出数据或将数据直接抛出到observer. 这可能会产生背压。(因为它一次性发出全部数据。)
所以用 ( Flowable)
.subscribeWith(new ResourceSubscriber<>) // in case of flowable
Run Code Online (Sandbox Code Playgroud)
或者,
.subscribeWith(new DisposableSubscriber<>)
Run Code Online (Sandbox Code Playgroud)
而在 ( observable) 的情况下使用
.subscribeWith(new ResourceObserver<>)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6728 次 |
| 最近记录: |