标签: rx-java

Android 中的 RxJava 没有 Observable.create?

我试图从这里运行这个例子

Observable<String> values = Observable.create(o -> {
    o.onNext("Hello");
    o.onCompleted();
});
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
Run Code Online (Sandbox Code Playgroud)

除了我不能在 Android Studio 中使用 Lambda 之外,没有 Observable.create()。我唯一的选择是 Observable.class?

我正在使用 1.1.6 版并通过 Gradle 获取库。

android android-studio rx-java

0
推荐指数
1
解决办法
171
查看次数

RxJava:数据库和远程服务器

我有一个要从本地数据库(如果可用)或其他远程服务器检索的对象列表。我正在使用 RxJava Observables(数据库使用 SqlBrite,远程服务器使用 Retrofit)。

我的查询代码如下:

Observable<List<MyObject>> dbObservable = mDatabase
            .createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
            .mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();

return Observable.concat(dbObservable, remoteObservable)
    .first(new Func1<List<MyObject>, Boolean>() {
                @Override
                public Boolean call(List<MyObject> myObjects) {
                    return !myObjects.isEmpty();
                }
            });
Run Code Online (Sandbox Code Playgroud)

我看到第一个 observable 正在运行并使用空列表命中第一个方法,但是改造后的 observable 没有运行,没有网络请求。如果我切换 observable 的顺序,或者只是返回远程 observable,它会按预期工作,它会访问远程服务器并返回对象列表。

为什么远程 observable 在这种情况下无法运行?当我首先将 observables 与 db 连接起来,然后再进行改造时,不会调用订阅者的 onNext、orError 和 onComplete 方法。

谢谢!

android rx-java retrofit2

0
推荐指数
1
解决办法
3302
查看次数

RxAndroid - BehaviorSubject 未在 onNext 上发射

我正在尝试使用 MVVM 模式学习 RXJava。

这是我试图实现的场景:

在某些搜索事件中,我正在调用 SearchViewModel.handleSearchTopic() ,它正在发出列表,但不知何故它没有被观察者的 onNext 事件捕获。订阅也成功进行。我想我犯了一些简单的错误,请指出。另外,有没有更好的方法来实现这个用例?

搜索视图模型.java

private final BehaviorSubject<List<Topic>> topicList = BehaviorSubject.create();

public void handleSearchTopic() {
    List<Topic> list = //getsomehow;
    topicList.onNext(list);
}

public Observable<List<Topic>> getTopicListObservable() {
    return topicList.asObservable();
}
Run Code Online (Sandbox Code Playgroud)

片段.java

@NonNull
private CompositeSubscription subscription;

@NonNull
private SearchViewModel searchViewModel;

@Override
public void onActivityCreated(Bundle savedInstanceState) {
    super.onActivityCreated(savedInstanceState);
    searchViewModel = new SearchViewModel();
    bind();
}

@Override
public void onDestroy() {
    unBind();
    super.onDestroy();
}

private void bind() {
    subscription = new CompositeSubscription();

    subscription.add(searchViewModel.getTopicListObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<Topic>>() {
                @Override
                public void …
Run Code Online (Sandbox Code Playgroud)

java android mvvm rx-java

0
推荐指数
1
解决办法
1552
查看次数

在 Android 单元测试中为 doNothing.when 子句接收 Mockito UnfinishedStubbingException

首先,我的设置是 RXJava 1、Retrofit 2,我使用的是 Java 7。

我有一个方法,当它被调用时,会将一个原子布尔值设置为 true。

然后该方法调用改造 API。

完成后,超时等......原子布尔值重置为false。

因此,我想测试一下,当我调用我的方法时,Atomic Boolean 设置为 true。

因此,我执行以下操作:

    assertFalse(orderUseCase.isOrderInProcess());

    orderUseCase.execute(id, orderWrapper, ts);

    assertTrue(orderUseCase.isOrderInProcess());
Run Code Online (Sandbox Code Playgroud)

测试布尔值是否为假。执行我的用例测试布尔值是否为真。

现在,为了执行最后一个测试,我需要确保 API 在调用时什么都不做(execute 方法将调用改造 API。

为了尝试什么都不做,我在测试用例的开头使用了以下行。

doNothing().when(orderAPI.orderComplete(anyString(), any(OrderWrapper.class)));
Run Code Online (Sandbox Code Playgroud)

但是,我收到以下错误:

org.mockito.exceptions.misusing.UnfinishedStubbingException: 
Unfinished stubbing detected here:
-> at com.tfds.xms.unit_test.SingleTest.TestAtomicBooleanLocked(SingleTest.java:90)

E.g. thenReturn() may be missing.
Examples of correct stubbing:
    when(mock.isOk()).thenReturn(true);
    when(mock.isOk()).thenThrow(exception);
    doThrow(exception).when(mock).someVoidMethod();
Hints:
 1. missing thenReturn()
 2. you are trying to stub a final method, you naughty developer!
 3: you are stubbing the behaviour of another mock inside before 'thenReturn' instruction …
Run Code Online (Sandbox Code Playgroud)

android unit-testing mockito powermock rx-java

0
推荐指数
1
解决办法
5295
查看次数

Rxjava:如何在没有完成所有 observable 的情况下组合多个 observable?

我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。

例如。

observable1 = PublishSubject<>()  
observable2 = PublishSubject<>()

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)

应该发出:

(1, null) 
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)

也有可能observable2在之前被发射observable1

CombineLatest是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?

java reactive-programming rx-java rx-java2

0
推荐指数
1
解决办法
1309
查看次数

如何避免 RxJava 中的嵌套回调?

我正在使用Android 的Reactive Network库。我是 RxJava 的绝对初学者,我正在努力解决它。我想做的是:

1]持续观察手机网络连接状态的变化

2]如果手机已连接到网络,请检查一次是否有互联网连接

为此,我在 Kotlin 中有以下代码:

    ReactiveNetwork.observeNetworkConnectivity(applicationContext)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { connectivity ->
                if (connectivity.state == NetworkInfo.State.CONNECTED) {
                    ReactiveNetwork.checkInternetConnectivity()
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe { isConnectedToInternet ->
                                if (isConnectedToInternet) {
                                    Log.d("VED-APP", "Connected to Internet")
                                } else {
                                    Log.d("VED-APP", "Not Connected to Internet")
                                }
                            }
                } 
            }
Run Code Online (Sandbox Code Playgroud)

然而,这段代码很丑陋,而且非常嵌套。有没有办法清理这段代码?

尽管示例是 Kotlin 中的,但 Java 或 Kotlin 中的答案都会有所帮助。

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
365
查看次数

在 MVVM 架构中用 RxJava 替换 LiveData

LiveData似乎非常有用,因为它只在视图处于活动状态时通知视图。它还在订阅后立即存储并返回最后一个值给新订阅者。
我的问题是如何仅使用 RxJava 实现相同的功能?

由于 Rx 是一个功能齐全的反应式解决方案,将它与另一个反应式解决方案结合起来似乎不太对。我更喜欢是否可以从项目中删除 LiveData。

我知道https://github.com/trello/RxLifecyclehttps://github.com/uber/AutoDispose但他们所做的是取消订阅流。我不想要那个。只要视图模型还活着,我就希望我的流存在,但我的活动会根据生命周期开始和停止收听 Steam。

任何建议将不胜感激

android mvvm rx-java android-livedata

0
推荐指数
1
解决办法
4506
查看次数

如果 RxJava 失败,如何恢复映射列表

给定以下代码,onErrorResumeNext 方法只会用“hi”替换列表中的错误项。我将如何让它继续迭代列表的其余部分?

List<Object> list = new ArrayList<>();
list.add("one");
list.add(new Testo());
list.add("two");
list.add("three");    

Observable.fromIterable(list)
    .map(item -> item.toString())
    .onErrorResumeNext(Observable.just("hi"))
    .subscribe(item -> System.out.println(item), onError -> System.out.println("error"));

private static class Testo {
    @Override
    public String toString() {
        throw new NullPointerException();
    }
}
Run Code Online (Sandbox Code Playgroud)

rx-java reactivex rx-java2

0
推荐指数
1
解决办法
385
查看次数

RxJava。Observable.OnSubscribe 无法解析为类型

我正在尝试用 RxJava 编写我的第一个代码,但我想我遇到了一些库导入错误。

package second.pack;
import io.reactivex.Observable;
public class Main {
    public static void main(String[] args) {
        Observable <String> observable = Observable.create(
// the line below marked as it has an error in Eclipse
/* 7 line */    new Observable.OnSubscribe<String>(){
                    @Override
                    public void call (Subscriber <String> sub){
                      sub.onNext("New Datas");
                      sub.onComplete(); 
                }
            }
    );
}
Run Code Online (Sandbox Code Playgroud)

}

错误

Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
    Observable.OnSubscribe cannot be resolved to a type
    at second.pack.Main.main(Main.java:7)
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

请问有人能帮我解决这个错误吗?非常感谢!

java rx-java

0
推荐指数
1
解决办法
1099
查看次数

PublishSubject blocksLast() 挂起 Android 应用程序而不调用

我在从主题获取最后发出的值时遇到问题

这是我的班级,负责发射和观察电池变化:

class BatteryLevelProvider @Inject constructor(
app: App
) {

private val context: Context = app
private val receiver: PowerConnectionReceiver = PowerConnectionReceiver()

init {
initializeReceiver()
}

private fun initializeReceiver() {
IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intentFilter ->
  context.registerReceiver(receiver, intentFilter)
  }
}

companion object {
 val batteryLevelSubject = PublishSubject.create<Int>()
}

fun observeBatteryLevel(): Observable<Int> = batteryLevelSubject.distinctUntilChanged()

fun getCurrentBatteryLevel(): Int {
Timber.d("getCurrentBatteryLevel: ENTERED")
val blockingLast = batteryLevelSubject.blockingLast(0)
Timber.d("getCurrentBatteryLevel: $blockingLast")
return blockingLast
}

inner class PowerConnectionReceiver : BroadcastReceiver() {

override fun onReceive(context: Context, intent: Intent) {
  val …
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
596
查看次数