标签: rx-java

RxJava:我可以覆盖OnError或创建一个处理特定错误的自定义Observable

我在我的应用程序中遇到了一个问题,我在许多不同订阅的onError中检查特定错误(假设错误9000).所有这些都可能会或可能不会以相同的方式处理错误.而不是检查这些订阅的OnError中是否(错误== 9000)是否有一种方法来创建一个自定义的Observable或运算符,专门检查此错误或类似.doOn9000Error()

android reactive-programming rx-java

1
推荐指数
1
解决办法
959
查看次数

使用rxjava我无法从主线程中进行改造

简化代码如下,s是与我的终点的返回类型匹配的订户.无论我做什么,我都会在主线程异常上获得一个网络.有些教程甚至没有提到这里的线程,说当你使用rxjava时,retrofit会自动处理它,但事情似乎对我来说是另一回事,我似乎无法将它从主线程中解脱出来.

PostInterface p = getRestAdapter().create(PostInterface.class);

p.getFeedForUser()
    .observeOn(Schedulers.newThread())
    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribe(s);
Run Code Online (Sandbox Code Playgroud)

编辑:

我也试过这段代码(发布在下面):

p.getFeedForUser()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.newThread())
    .subscribe(s);
Run Code Online (Sandbox Code Playgroud)

我也尝试过没有订阅线程.我每次都在主线程异常上获得网络.

android rx-java retrofit

1
推荐指数
1
解决办法
1475
查看次数

RxJava for循环与异步方法?

我正在学习如何在Android上使用RxJava.在我的项目中,我有一个单一的情况,我List从服务器获取一个项目,我需要获取每个项目的详细信息,然后将响应返回到UI.

问题是:我不确定如何sync在内部调用API调用for loop.我希望在foor loop完成所有调用之后,最终结果返回到UI .

任何提示都非常感谢.

示例代码:

Observable<List<Favorite>> result = ApiWrapper.getFavorites(userData)
            .flatMap(new Func1<List<Favorite>, Observable<List<Favorite>>>() {
              @Override
              public Observable<List<Favorite>> call(@NonNull final List<Favorite> response) {
                for(int i = 0; i < response.size(); i++){
                  getDetailsForFavoriteItem(response.get(i));
                }
                return Observable.just(response);              
              }
            });

private Observable<Favorite> getDetailsForFavoriteItem (Favorite item){
  return ApiWrapper.getDetails(item.getId())
        .flatMap(new Func1<FavoriteDetailsResponse, Observable<Favorite>>() {
              @Override
              public Observable<Favorite> call(@NonNull final FavoriteDetailsResponse response) {
                item.setParam1(response.getParam1());
                item.setParam2(response.getParam2);
                //so on
                return Observable.just(item)//!!!issue is here ! I want this return to …
Run Code Online (Sandbox Code Playgroud)

android rx-java

1
推荐指数
1
解决办法
1708
查看次数

从RxJava编译示例时找不到org.reactivestreams.Publisher的类文件?

以下代码

package com.inthemoon.snippets.rxjava;

import io.reactivex.*;

public class HelloWorld {

   public static void main(String[] args) {
      Flowable.just("Hello world").subscribe(System.out::println);
   }

}
Run Code Online (Sandbox Code Playgroud)

导致以下编译错误

错误:(9、15)Java:无法访问org.reactivestreams.Publisher的org.reactivestreams.Publisher类文件

POM依赖关系如下

<dependencies>
        <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.0.4</version>
        </dependency>


    </dependencies>
Run Code Online (Sandbox Code Playgroud)

java maven rx-java rx-java2

1
推荐指数
2
解决办法
5251
查看次数

RxJava 2:在一段时间后发出收集的项目列表

我有一个Observable,当它被添加到db时,它正在监听数据库并发出项目.当我订阅这个observable时,它会逐个在db中快速发出已存储的项目.我的问题是我可以创建observable来收集以一定间隔(例如100毫秒)发出的项目到列表并发出(或返回一些函数,如doOnNext)整个列表和单独的项目,如果有更大的发射间隔?

预先感谢!

java android rx-java rx-java2

1
推荐指数
1
解决办法
589
查看次数

为什么PublishSubject不会发出第一个元素

我尝试使用rx和PublishSubject.所以我创建了对象:

PublishSubject<MyEvent> events = PublishSubject.create();

在我的服务中,我把MyEvent它放进去.

events.onNext(new MyEvent);

所以有时候我没有收到第一个活动,但是我收到了第二个活动.有时我收到所有活动.任何人都可以解释,这可能是什么问题?

android rx-java

1
推荐指数
1
解决办法
625
查看次数

RxAndroid从主线程调用

我有这个问题;)

我试图调用这个用例,最后返回一个Observable.

但是,尽管使用调度程序,仍然在主线程上调用.我不知道为什么:

它看起来像这样:

class MainViewModel @Inject constructor(private val loadNewsUseCase: LoadNews) : Model {

    override fun loadNews() {
        loadNewsUseCase.execute(NewsObserver(), "")
    }

    override fun dispose() {
        loadNewsUseCase.dispose()
    }
}

class NewsObserver : DisposableObserver<Result>() {
    override fun onComplete() {
        Log.i("TAG", "")
    }

    override fun onNext(t: Result) {
        Log.i("TAG", "")

    }

    override fun onError(e: Throwable) {
        Log.i("TAG", "")

    }
}
Run Code Online (Sandbox Code Playgroud)

-

abstract class UseCase<T, in P>(
        private val computationThreadExecutor: ComputationThreadExecutor,
        private val mainThreadExecutor: MainThreadExecutor,
        private val compositeDisposable: CompositeDisposable = CompositeDisposable()
) {

    abstract …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android rx-java2

1
推荐指数
1
解决办法
351
查看次数

doOnNext中不会发出可观察的startWith

我有许多Observable用于我的应用程序中的网络请求.由于这么多是相同的,我对它们应用了一个Observable转换:

/**
 * Creates a transformer that applies the schedulers and error handling for all of the observables in this ViewModel.
 */
private fun applyTransformations(): Observable.Transformer<NetworkState, NetworkState> {
    return Observable.Transformer { observable ->
        observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorReturn { NetworkState.Error(it) }
                .doOnNext { publishState(it) }
                .startWith(NetworkState.Loading())
    }
}
Run Code Online (Sandbox Code Playgroud)

我试图通过以上目标实现的目标:

  • 应用一致的调度程序
  • 通过返回我的密封类的实例来处理任何错误.
  • 通过发布observable返回的状态来处理任何onNext.
  • 通过发送加载状态开始.

这很好用,但我注意到的是,当我调用startWith和加载状态时,它实际上从未实际处理过doOnNext().换句话说,publishState()永远不会调用我的加载状态.

在我设置observable的地方,我不打算添加订阅者,因为doOnNext()以上就是我需要的所有内容:

val subscription = repository.getInstagramPhotos(count)
        .map { mapIGPhotoResponse(it) }
        .compose(applyTransformations())
        .subscribe()
Run Code Online (Sandbox Code Playgroud)

但是,如果我要提供上面的订户,它将处理加载状态.它还将处理两个onNext()调用 - 一个用于提供的订户,另一个用于doOnNext转换.

有没有办法修改此 …

android rx-java

1
推荐指数
1
解决办法
236
查看次数

Android RxJava 2:fromCallable和Just有什么区别?

我一直在尝试使用Android中的RxJava,但我想弄清楚fromCallable和Just之间的区别.两者都只接收一次数据,而Create则可以多次接收数据.这是我用来试验的代码:

public class MainActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        listenCallable();
        listenJust();
    }
    private void listenCallable() {
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        Disposable disposable = Shooter.getCallable().subscribe(i -> {
            Log.d("Tag", "Listen Callable: " + i);
        });
        compositeDisposable.add(disposable);
        compositeDisposable.dispose();
    }

    private void listenJust() {
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        Disposable disposable = Shooter.getJust("Jay").subscribe(i -> {
            Log.d("Tag", "Listen Just " + i);
        });
        compositeDisposable.add(disposable);
        compositeDisposable.dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

以及发出数据的类:

public class Shooter {
    public static Observable<String> getCallable() {
        return …
Run Code Online (Sandbox Code Playgroud)

java android rx-java

1
推荐指数
1
解决办法
902
查看次数

Observable#take(Long)没有返回RxJava中所需的项目大小

我正在使用RxJava/Kotlin Observable#take()从列表中获取前50个项目.但是#take()运算符的行为并不像Rx docs那样.

在Rx docs中,#take()定义为:

"只发出一个Observable发出的前n个项目"

在此输入图像描述

我有这样的功能:

我们可以看到pageSize论证是50

在此输入图像描述

最初sizelist300

在此输入图像描述

之后#take(50)应用于那个Observable和下一个断点我仍然得到完整的大小列表i.e. size = 300

在此输入图像描述

但是just for the check,如果事情是错的调试器或观察到的,我想只需要其显示名中含有"9"的项目,但这次我得到的预期结果smaller list9处于它们的#displayName field.

在此输入图像描述

我相信RxJava/Kotlin's #take()运营商并不那么疯狂,而且只是我.

java kotlin rx-java rx-android rx-kotlin

1
推荐指数
1
解决办法
112
查看次数