我正试图拥抱RxJava的荣耀并将其集成到我的应用程序中.我已经编写了以下代码来添加漫画,其累积成本不超过定义的预算.为实现这一目标,我编写了2个实现.
Observable.create()不鼓励的用途主要是因为订阅和背压的复杂性如果暂时将Subscription和Backpressure处理放在一边,我希望得到关于哪些实现在性能,内存消耗和简单性方面更好的反馈Observable.create()?
第一次实施:
Observable<Integer> filterObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Timber.d("filterComicsAccordingToBudget():subscribe");
int pageCountOfComicsWithInBudget = 0;
double totalCost = 0.0;
for(MarvelComic comic : getMarvelComicsList()) {
totalCost += Double.valueOf(comic.getPrice());
Timber.d("totalCost: %s budget: %s priceOfComic: %s", totalCost, budget, comic.getPrice());
if(totalCost > budget) {
break;
}
pageCountOfComicsWithInBudget += Integer.valueOf(comic.getPageCount());
Timber.d("pageCount: %s price: %s comicName: %s totalPages: %s", comic.getPageCount(), comic.getPrice(), comic.getTitle(), pageCountOfComicsWithInBudget);
e.onNext(pageCountOfComicsWithInBudget);
}
e.onComplete();
}
});
filterObservable.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() { …Run Code Online (Sandbox Code Playgroud) 我有一个WS,如果成功则返回200,没有任何正文,否则返回420,错误正文中的json不成功
返回类型为
Observable<Response<Void>>
Run Code Online (Sandbox Code Playgroud)
出于某种原因,如果出现420代码错误,onNext(Response<Void> value)则调用onError(Throwable e)该代码,而不是针对其他任何不成功的请求调用该代码。
为什么仅在这种情况下才调用onNext而不是onError?如果请求未返回200,则可以调用onError?
我有一个Observable,当它被添加到db时,它正在监听数据库并发出项目.当我订阅这个observable时,它会逐个在db中快速发出已存储的项目.我的问题是我可以创建observable来收集以一定间隔(例如100毫秒)发出的项目到列表并发出(或返回一些函数,如doOnNext)整个列表和单独的项目,如果有更大的发射间隔?
预先感谢!
我是新手使用rxjava并且我正在尝试使用rxjava2在后台运行一个函数,但该方法未被调用我正在使用的代码如下所示让我知道它是否是在后台执行函数的正确方法:
Observable.fromCallable<OrderItem>(Callable {
saveToDb(existingQty, newOty, product_id)
}).doOnSubscribe {
object : Observable<OrderItem>() {
override fun subscribeActual(emitter: Observer<in OrderItem>?) {
try {
val orderItem = saveToDb(existingQty, newOty, product_id)
emitter?.onNext(orderItem)
emitter?.onComplete()
} catch (e: Exception) {
emitter?.onError(e)
}
}
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).doOnSubscribe {
object : Observer<OrderItem> {
override fun onComplete() {
}
override fun onNext(t: OrderItem) {
}
override fun onError(e: Throwable) {
}
override fun onSubscribe(d: Disposable) {
}
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Rx方式使用房间从数据库中检索数据.这就是我试图这样做的方式
override fun onStart() {
super.onStart()
disposable.add(presenter.getAllBooks()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
println(it.size())
}))
}
Run Code Online (Sandbox Code Playgroud)
这是getAllBooks()演示者内部的方法
fun getAllBooks() : Flowable<List<Book>> {
val isMainThread = Looper.myLooper() == Looper.getMainLooper()
if (!isMainThread) {
updateBooks()
return db.bookDao().allBooks
}
return Flowable.empty()
}
Run Code Online (Sandbox Code Playgroud)
这里isMainThread变量总是true,我也试过observeOn(Shcedulers.io()),但同样的问题.
我有以下@Dao,提供Flowable<User>流:
@Dao
interface UsersDao {
@Query("SELECT * FROM users")
fun loadUsers(): Flowable<List<User>>
}
Run Code Online (Sandbox Code Playgroud)
我希望流的订户在那里发生一些变化时立即接收数据库的更新.订阅Room Flowable我将获得开箱即用的功能.
我想要的是:如果数据库为空我想执行Web请求并将用户保存到数据库中.订阅者将自动接收刚刚发生的新更新.
现在我希望存储库的客户端不要知道所有的初始化逻辑:他所做的一切 - 他执行usersRepository.loadUsers().所有这些魔法应该发生在存储库类中:
class UsersRepository @Inject constructor(
private val api: Api,
private val db: UsersDao
) {
fun loadUsers(): Flowable<List<User>> {
...
}
}
Run Code Online (Sandbox Code Playgroud)
当然我可以使用以下方法:
fun loadUsers(): Flowable<List<User>> {
return db.loadTables()
.doOnSubscribe {
if (db.getCount() == 0) {
val list = api.getTables().blockingGet()
db.insert(list)
}
}
}
Run Code Online (Sandbox Code Playgroud)
但我想在不使用副作用(doOn...运算符)的情况下构造流.我尝试过,composing()但没有多大帮助.被困在这一段时间了.
在RxJava1中,flatmap有一个重载的方法,允许您保留源值并将其传递到流中。
我从以下博客文章中获得了这些知识
但是,转到RxJava2,我似乎找不到它。我检查了Rx1和Rx2的更改,但未列出。我想知道它是否仍然存在,但也许我找的地方不对。
我正在使用Single顺便说一句。
我有这个问题;)
我试图调用这个用例,最后返回一个Observable.
但是,尽管使用调度程序,仍然在主线程上调用.我不知道为什么:
它看起来像这样:
class MainViewModel @Inject constructor(private val loadNewsUseCase: LoadNews) : Model {
override fun loadNews() {
loadNewsUseCase.execute(NewsObserver(), "")
}
override fun dispose() {
loadNewsUseCase.dispose()
}
}
class NewsObserver : DisposableObserver<Result>() {
override fun onComplete() {
Log.i("TAG", "")
}
override fun onNext(t: Result) {
Log.i("TAG", "")
}
override fun onError(e: Throwable) {
Log.i("TAG", "")
}
}
Run Code Online (Sandbox Code Playgroud)
-
abstract class UseCase<T, in P>(
private val computationThreadExecutor: ComputationThreadExecutor,
private val mainThreadExecutor: MainThreadExecutor,
private val compositeDisposable: CompositeDisposable = CompositeDisposable()
) {
abstract …Run Code Online (Sandbox Code Playgroud) 我有一门课Student,它有两个字段grade和school。这两个字段都需要从远程服务器中获取。当返回两个结果时,我新建了一个Student对象。
在的帮助下RxJava,我以两种方式完成了此操作,一种是flatMapin zip运算符,另一种是in 运算符。
Observable<String> gradeObservable =
Observable.create((ObservableOnSubscribe<String>) emitter -> {
Thread.sleep(1000);
emitter.onNext("senior");
}).subscribeOn(Schedulers.io());
Observable<String> schoolObservable =
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("MIT");
}).subscribeOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)
gradeObservable
.flatMap(grade ->
schoolObservable.map(school -> {
Student student = new Student();
student.grade = grade;
student.school = school;
return student;
}))
.subscribe(student -> {
System.out.println(student.grade);
System.out.println(student.school);
});
Run Code Online (Sandbox Code Playgroud)
Observable.zip(gradeObservable, schoolObservable, (grade, school) -> {
Student student = new Student();
student.grade = grade;
student.school = …Run Code Online (Sandbox Code Playgroud) 我已经为下课写了单元测试。测试中的TestObserver无法接收任何数据。对于需要做什么我一无所知。
public class DataLocationMonitor {
private PublishSubject<Object> dataLocatorSubject;
public DataLocationMonitor(...) {
this.dataLocatorSubject = PublishSubject.create();
}
public Observable<Object> getObservable() {
return this.dataLocatorSubject;
}
public void handleData(Object data) {
if (data instanceof DataMessage) {
DataMessage message = new DataMessage(...);
this.dataLocatorSubject.onNext(message);
}
}
}
@RunWith(MockitoJUnitRunner.class)
public class DataLocationMonitorTest {
private DataLocationMonitor target;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
this.target = new DataLocationMonitor(...);
}
@Test
public void handleData_dataLocatorSubjectOnNextCalled() {
TestObserver<Object> observer= TestObserver.create();
Observable<Object> dataLocatorSubject = this.target.getObservable();
DataMessage data = new DataMessage();
this.target.handleData(data);
dataLocatorSubject.subscribe(observer);
observer.assertSubscribed(); …Run Code Online (Sandbox Code Playgroud) rx-java2 ×10
android ×8
rx-java ×7
java ×3
kotlin ×3
rx-android ×3
android-room ×2
reactive ×1
retrofit ×1
retrofit2 ×1