我在我的应用程序中遇到了一个问题,我在许多不同订阅的onError中检查特定错误(假设错误9000).所有这些都可能会或可能不会以相同的方式处理错误.而不是检查这些订阅的OnError中是否(错误== 9000)是否有一种方法来创建一个自定义的Observable或运算符,专门检查此错误或类似.doOn9000Error()
简化代码如下,s是与我的终点的返回类型匹配的订户.无论我做什么,我都会在主线程异常上获得一个网络.有些教程甚至没有提到这里的线程,说当你使用rxjava时,retrofit会自动处理它,但事情似乎对我来说是另一回事,我似乎无法将它从主线程中解脱出来.
PostInterface p = getRestAdapter().create(PostInterface.class);
p.getFeedForUser()
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(s);
Run Code Online (Sandbox Code Playgroud)
编辑:
我也试过这段代码(发布在下面):
p.getFeedForUser()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())
.subscribe(s);
Run Code Online (Sandbox Code Playgroud)
我也尝试过没有订阅线程.我每次都在主线程异常上获得网络.
我正在学习如何在Android上使用RxJava.在我的项目中,我有一个单一的情况,我List从服务器获取一个项目,我需要获取每个项目的详细信息,然后将响应返回到UI.
问题是:我不确定如何sync在内部调用API调用for loop.我希望在foor loop完成所有调用之后,最终结果返回到UI .
任何提示都非常感谢.
示例代码:
Observable<List<Favorite>> result = ApiWrapper.getFavorites(userData)
.flatMap(new Func1<List<Favorite>, Observable<List<Favorite>>>() {
@Override
public Observable<List<Favorite>> call(@NonNull final List<Favorite> response) {
for(int i = 0; i < response.size(); i++){
getDetailsForFavoriteItem(response.get(i));
}
return Observable.just(response);
}
});
private Observable<Favorite> getDetailsForFavoriteItem (Favorite item){
return ApiWrapper.getDetails(item.getId())
.flatMap(new Func1<FavoriteDetailsResponse, Observable<Favorite>>() {
@Override
public Observable<Favorite> call(@NonNull final FavoriteDetailsResponse response) {
item.setParam1(response.getParam1());
item.setParam2(response.getParam2);
//so on
return Observable.just(item)//!!!issue is here ! I want this return to …Run Code Online (Sandbox Code Playgroud) 以下代码
package com.inthemoon.snippets.rxjava;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
Run Code Online (Sandbox Code Playgroud)
导致以下编译错误
错误:(9、15)Java:无法访问org.reactivestreams.Publisher的org.reactivestreams.Publisher类文件
POM依赖关系如下
<dependencies>
<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud) 我有一个Observable,当它被添加到db时,它正在监听数据库并发出项目.当我订阅这个observable时,它会逐个在db中快速发出已存储的项目.我的问题是我可以创建observable来收集以一定间隔(例如100毫秒)发出的项目到列表并发出(或返回一些函数,如doOnNext)整个列表和单独的项目,如果有更大的发射间隔?
预先感谢!
我尝试使用rx和PublishSubject.所以我创建了对象:
PublishSubject<MyEvent> events = PublishSubject.create();
在我的服务中,我把MyEvent它放进去.
events.onNext(new MyEvent);
所以有时候我没有收到第一个活动,但是我收到了第二个活动.有时我收到所有活动.任何人都可以解释,这可能是什么问题?
我有这个问题;)
我试图调用这个用例,最后返回一个Observable.
但是,尽管使用调度程序,仍然在主线程上调用.我不知道为什么:
它看起来像这样:
class MainViewModel @Inject constructor(private val loadNewsUseCase: LoadNews) : Model {
override fun loadNews() {
loadNewsUseCase.execute(NewsObserver(), "")
}
override fun dispose() {
loadNewsUseCase.dispose()
}
}
class NewsObserver : DisposableObserver<Result>() {
override fun onComplete() {
Log.i("TAG", "")
}
override fun onNext(t: Result) {
Log.i("TAG", "")
}
override fun onError(e: Throwable) {
Log.i("TAG", "")
}
}
Run Code Online (Sandbox Code Playgroud)
-
abstract class UseCase<T, in P>(
private val computationThreadExecutor: ComputationThreadExecutor,
private val mainThreadExecutor: MainThreadExecutor,
private val compositeDisposable: CompositeDisposable = CompositeDisposable()
) {
abstract …Run Code Online (Sandbox Code Playgroud) 我有许多Observable用于我的应用程序中的网络请求.由于这么多是相同的,我对它们应用了一个Observable转换:
/**
* Creates a transformer that applies the schedulers and error handling for all of the observables in this ViewModel.
*/
private fun applyTransformations(): Observable.Transformer<NetworkState, NetworkState> {
return Observable.Transformer { observable ->
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorReturn { NetworkState.Error(it) }
.doOnNext { publishState(it) }
.startWith(NetworkState.Loading())
}
}
Run Code Online (Sandbox Code Playgroud)
我试图通过以上目标实现的目标:
这很好用,但我注意到的是,当我调用startWith和加载状态时,它实际上从未实际处理过doOnNext().换句话说,publishState()永远不会调用我的加载状态.
在我设置observable的地方,我不打算添加订阅者,因为doOnNext()以上就是我需要的所有内容:
val subscription = repository.getInstagramPhotos(count)
.map { mapIGPhotoResponse(it) }
.compose(applyTransformations())
.subscribe()
Run Code Online (Sandbox Code Playgroud)
但是,如果我要提供上面的订户,它将处理加载状态.它还将处理两个onNext()调用 - 一个用于提供的订户,另一个用于doOnNext转换.
有没有办法修改此 …
我一直在尝试使用Android中的RxJava,但我想弄清楚fromCallable和Just之间的区别.两者都只接收一次数据,而Create则可以多次接收数据.这是我用来试验的代码:
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
listenCallable();
listenJust();
}
private void listenCallable() {
CompositeDisposable compositeDisposable = new CompositeDisposable();
Disposable disposable = Shooter.getCallable().subscribe(i -> {
Log.d("Tag", "Listen Callable: " + i);
});
compositeDisposable.add(disposable);
compositeDisposable.dispose();
}
private void listenJust() {
CompositeDisposable compositeDisposable = new CompositeDisposable();
Disposable disposable = Shooter.getJust("Jay").subscribe(i -> {
Log.d("Tag", "Listen Just " + i);
});
compositeDisposable.add(disposable);
compositeDisposable.dispose();
}
}
Run Code Online (Sandbox Code Playgroud)
以及发出数据的类:
public class Shooter {
public static Observable<String> getCallable() {
return …Run Code Online (Sandbox Code Playgroud) 我正在使用RxJava/Kotlin Observable#take()从列表中获取前50个项目.但是#take()运算符的行为并不像Rx docs那样.
在Rx docs中,#take()定义为:
"只发出一个Observable发出的前n个项目"
我有这样的功能:
我们可以看到pageSize论证是50
最初size的list是300
之后#take(50)应用于那个Observable和下一个断点我仍然得到完整的大小列表i.e. size = 300
但是just for the check,如果事情是错的调试器或观察到的,我想只需要其显示名中含有"9"的项目,但这次我得到的预期结果smaller list与9处于它们的#displayName field.
我相信RxJava/Kotlin's #take()运营商并不那么疯狂,而且只是我.