标签: rx-java2

RxJava:Observable和默认线程

我有以下代码:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" …
Run Code Online (Sandbox Code Playgroud)

java rx-java rx-java2

8
推荐指数
1
解决办法
5582
查看次数

如何在单元测试中处理模拟的RxJava2可观察抛出异常

过去几周我一直在使用MVP在Android中在Kotlin做TDD.事情进展顺利.

我使用Mockito来模拟类,但我似乎无法克服如何实现我想要运行的测试之一.

以下是我的测试:

  1. 调用api,接收数据列表,然后显示列表. loadAllPlacesTest()
  2. 调用api,接收空数据,然后显示列表. loadEmptyPlacesTest()
  3. 调用api,路上会发生一些异常,然后显示错误信息. loadExceptionPlacesTest()

我成功地测试了#1和#2.问题在于#3,我不确定如何在代码中进行测试.

RestApiInterface.kt

interface RestApiInterface {

@GET(RestApiManager.PLACES_URL)
fun getPlacesPagedObservable(
        @Header("header_access_token") accessToken: String?,
        @Query("page") page: Int?
): Observable<PlacesWrapper>
}
Run Code Online (Sandbox Code Playgroud)

RestApiManager.kt 实现接口的管理器类如下所示:

open class RestApiManager: RestApiInterface{
var api: RestApiInterface
    internal set
internal var retrofit: Retrofit
init {
    val logging = HttpLoggingInterceptor()
    // set your desired log level
    logging.setLevel(HttpLoggingInterceptor.Level.BODY)

    val client = okhttp3.OkHttpClient().newBuilder()
            .readTimeout(60, TimeUnit.SECONDS)
            .connectTimeout(60, TimeUnit.SECONDS)
            .addInterceptor(LoggingInterceptor())  
            .build()


    retrofit = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .client(client)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//very important for RXJAVA and retrofit
            .build()
    api …
Run Code Online (Sandbox Code Playgroud)

java android unit-testing retrofit2 rx-java2

8
推荐指数
1
解决办法
4345
查看次数

在Rx链中多次切换线程

我们假设我有以下Android案例:

  1. 请求来自网络的组列表
  2. 显示每个组的一些UI元素
  3. 请求每个组的项目
  4. 显示每个项目的UI元素

我想用RxJava做到这一点:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我有2个视图对象调用,每个调用都必须在主线程上执行.并且2次调用webService,必须在后台线程上执行.

这段代码的问题:首先调用view会在后台执行导致Android RuntimeException(只有原始线程可能会触及视图或者其他东西)如果我转移.observeOn到链的开头 - 第二次webService调用将在主线程中执行.

如何在RxJava链中多次"游"穿过线程?

android rx-java rx-java2

8
推荐指数
2
解决办法
3340
查看次数

RxJava2表单验证

我有一个表格,有4个可能的选项需要检查(根据情况可能会更少).在创建订单时,有2个editexts,一个用于电子邮件,另一个用于参考字段.

电子邮件和参考字段可能会根据条件(在创建表单时可用)保留为空.此外,我们可能需要显示一个警告对话框,告诉用户可能无法显示参考值(对订单的收件人),他们可能还需要同意条款和条件警报对话框.

目前onConfirm检查是这样的,

void onCreateOrderConfirmed(@Nullable final String receiverEmail,
                            @Nullable final String reference,
                            @Nullable final Boolean noRefAgreed,
                            @Nullable final Boolean termsAndConditionsAgreed) {

    if (!reviewCompletionState.emailRequirementSatisfied()) {
        if (!isValidEmail(receiverEmail)) {
            view.showEmailError();
            return;
        }

        reviewCompletionState = reviewCompletionState.newBuilder()
                .receiverEmail(receiverEmail)
                .emailRequirementSatisfied(true)
                .build();
    }

    if (!reviewCompletionState.referenceRequirementSatisfied()) {
        if (isEmpty(reference)) {
            view.showReferenceError();
            return;
        }

        reviewCompletionState = reviewCompletionState.newBuilder()
                .reference(reference)
                .referenceRequirementSatisfied(true)
                .build();
    }

    if (!reviewCompletionState.noRefAgreed()) {
        if (noRefAgreed == null || !noRefAgreed) {
            view.showNoReferenceAlert();
            return;
        }

        reviewCompletionState = reviewCompletionState.newBuilder()
                .noRefAgreed(true)
                .build();
    }

    if (!reviewCompletionState.termsAndConditionsAgreed()) {
        if (termsAndConditionsAgreed == null || …
Run Code Online (Sandbox Code Playgroud)

validation android rx-java2

8
推荐指数
2
解决办法
1101
查看次数

我是否需要处置()使用LiveDataReactiveStreams创建的发布者

假设我有一个Flowable,它在应用程序的不同部分之间共享.

在我想要观察它的每个片段中,我将其转换为LiveData LiveDataReactiveStreams.fromPublisher以避免泄漏和崩溃.我现在有一个包装我的Flowable的LiveData.

然后我将LiveData传递给我的ViewModel(在ViewModelFactory中).据我了解,我可以继续使用LiveData而不用担心泄漏.

现在,我不想直接观察LiveData,而是试图将它转换回Flowable with LiveDataReactiveStreams.toPublisherFlowable.fromPublisher,而是订阅Flowable.这是一个Flowable,它包装了一个包装Flowable的LiveData

我的问题是:我是否要担心处理此Flowable的订阅?我希望LiveData会充当"障碍",防止我的上下文泄漏回根Flowable,但我不太确定.

换一种说法:

  1. Flowable A存在于全局上下文中
  2. 在每个片段中,A被包装在LiveData B中,该数据被设置为片段ViewModel的属性
  3. 通常我会观察LiveData B,而是将其包装在Flowable C中
  4. 我订阅Flowable C并忽略退回的一次性用品

当碎片被破坏时,在C中访问的视图是否泄漏到A

android memory-leaks rx-java2 android-livedata android-architecture-components

8
推荐指数
1
解决办法
594
查看次数

android MVVM没有数据绑定

问题:我可以在不使用数据绑定的情况下使用MVVM实现Android应用程序.

我试图解决的问题非常简单:从后端API读取项目列表并在Recylerview中显示它们.

我是如何实施的:

在视图中 - 我有Activity和RecyclerViewAdapter模型:ApiResponse和数据模型网络 - 改造API服务,RxJava2

对于ViewModel部分 - 我有一个ViewModel类(它不是从任何东西派生的)基本上调用Retrofit Service并使用RxJava调用获取数据.

ViewModel有如下调用:

 void getItems();
 void addItemData();
 void removeItem();
Run Code Online (Sandbox Code Playgroud)

用RXJava2调用服务

 ObServable<ApiResponse> getItems();
 ObServable<ApiResponse> addItemData();
 ObServable<ApiResponse> removeItem();  
Run Code Online (Sandbox Code Playgroud)

View实例化ViewModel对象.ViewModel在创建期间获取Adapter对象的实例.在视图中,单击按钮会调用Activity中的ClickHandler,该ClickHandler调用ViewModel#getItems()方法.由于ViewModel具有到Adapter的链接,因此viewModel会更新适配器中的项目,以便自动更新RecyclerView.

我不确定这是否适合MVVM.

数据绑定对我来说有点像意大利面.

同样,我们可以在没有DataBinding的情况下在android中实现MVVM吗?方法好吗?

android mvvm android-databinding rx-java2

8
推荐指数
3
解决办法
4346
查看次数

RxJava返回单个,执行后完成

我正在尝试完成以下操作:将一些数据作为单个返回,然后执行完成后.由于single.andThen(),以下代码无法编译.需要按此顺序执行操作.

val single = Single.create<String> { it.onSuccess("result") }
val completable = Completable.create { println("executing cleanup") }
val together = single.andThen(completable)

together.subscribe(
        { println(it) },
        { println(it) }

)
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java rx-java2

8
推荐指数
3
解决办法
5102
查看次数

RxJava2并行执行一堆Completables并等待所有完成

我想执行几种阻塞方法(网络调用,计算任务).我希望并行执行它们,并在它们全部完成时收到通知,如果它们中的任何一个失败则会收到错误(抛出异常).他们不会发出结果所以Observable.zip()不会帮助我.

到目前为止,我有:

Completable a = computationTaskA();
Completable b = computationTaskB();
Completable c = computationTaskC();
Completable all = Completable.concat(Arrays.asList(a, b, c))
            .subscribe(() -> {
               // all succeed
            }, e -> {
               // any fails
            });
Run Code Online (Sandbox Code Playgroud)

不过Completable.concat()文档说Returns a Completable which completes only when all sources complete, one after another..我没有找到可以并行执行它们的解决方案.

java rx-java rx-java2

8
推荐指数
1
解决办法
2894
查看次数

如何将异步promise代码转换为rxjava

我有以下同步代码,我想在RXJava中建模为异步代码.

void executeActions(List<Action> action) {
  if (action == null || action.size() == 0) return;
  for (Action action: actions) { 
     executeActions(action.handle());
  } 
}


class Action {

   //implementation of handle 
   // return List<Action> or null. 
   List<Action> handle() {
   } 
} 
Run Code Online (Sandbox Code Playgroud)

现在在JS中,我可以像Prom这样建模这种与Promises的交互.(下面的伪代码 - 我的JS很弱)

executeActionsAsync(actions) { 
  var p = Promise.resolve(); 
  action.forEach(function(action) { 
    p = p.then(function() { 
           action.handle();    
        })
  } 
  return p; 
} 


class Action() { 
  function handle() { 
    actions = [];// some array of actions. 
    executeAsync(actions);
  } 
} 
Run Code Online (Sandbox Code Playgroud)

我想在RXJava2中对其进行建模.任何帮助表示赞赏.

java android promise rxjs rx-java2

8
推荐指数
1
解决办法
457
查看次数

从Kotlin实现Java接口时出现NullPointerException

我正在尝试BiFunction从Kotlin中的RxJava 实现接口,并且得到了NullPointerException

这是我在Kotlin中实现的Java接口。来自RxJava 2。

package io.reactivex.functions;

import io.reactivex.annotations.NonNull;

/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <R> the result type
 */
public interface BiFunction<T1, T2, R> {

    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result …
Run Code Online (Sandbox Code Playgroud)

java interface nullpointerexception kotlin rx-java2

8
推荐指数
2
解决办法
427
查看次数