我试图从这里运行这个例子
Observable<String> values = Observable.create(o -> {
o.onNext("Hello");
o.onCompleted();
});
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Run Code Online (Sandbox Code Playgroud)
除了我不能在 Android Studio 中使用 Lambda 之外,没有 Observable.create()。我唯一的选择是 Observable.class?
我正在使用 1.1.6 版并通过 Gradle 获取库。
我有一个要从本地数据库(如果可用)或其他远程服务器检索的对象列表。我正在使用 RxJava Observables(数据库使用 SqlBrite,远程服务器使用 Retrofit)。
我的查询代码如下:
Observable<List<MyObject>> dbObservable = mDatabase
.createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
.mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();
return Observable.concat(dbObservable, remoteObservable)
.first(new Func1<List<MyObject>, Boolean>() {
@Override
public Boolean call(List<MyObject> myObjects) {
return !myObjects.isEmpty();
}
});
Run Code Online (Sandbox Code Playgroud)
我看到第一个 observable 正在运行并使用空列表命中第一个方法,但是改造后的 observable 没有运行,没有网络请求。如果我切换 observable 的顺序,或者只是返回远程 observable,它会按预期工作,它会访问远程服务器并返回对象列表。
为什么远程 observable 在这种情况下无法运行?当我首先将 observables 与 db 连接起来,然后再进行改造时,不会调用订阅者的 onNext、orError 和 onComplete 方法。
谢谢!
我正在尝试使用 MVVM 模式学习 RXJava。
这是我试图实现的场景:
在某些搜索事件中,我正在调用 SearchViewModel.handleSearchTopic() ,它正在发出列表,但不知何故它没有被观察者的 onNext 事件捕获。订阅也成功进行。我想我犯了一些简单的错误,请指出。另外,有没有更好的方法来实现这个用例?
搜索视图模型.java
private final BehaviorSubject<List<Topic>> topicList = BehaviorSubject.create();
public void handleSearchTopic() {
List<Topic> list = //getsomehow;
topicList.onNext(list);
}
public Observable<List<Topic>> getTopicListObservable() {
return topicList.asObservable();
}
Run Code Online (Sandbox Code Playgroud)
片段.java
@NonNull
private CompositeSubscription subscription;
@NonNull
private SearchViewModel searchViewModel;
@Override
public void onActivityCreated(Bundle savedInstanceState) {
super.onActivityCreated(savedInstanceState);
searchViewModel = new SearchViewModel();
bind();
}
@Override
public void onDestroy() {
unBind();
super.onDestroy();
}
private void bind() {
subscription = new CompositeSubscription();
subscription.add(searchViewModel.getTopicListObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Topic>>() {
@Override
public void …Run Code Online (Sandbox Code Playgroud) 首先,我的设置是 RXJava 1、Retrofit 2,我使用的是 Java 7。
我有一个方法,当它被调用时,会将一个原子布尔值设置为 true。
然后该方法调用改造 API。
完成后,超时等......原子布尔值重置为false。
因此,我想测试一下,当我调用我的方法时,Atomic Boolean 设置为 true。
因此,我执行以下操作:
assertFalse(orderUseCase.isOrderInProcess());
orderUseCase.execute(id, orderWrapper, ts);
assertTrue(orderUseCase.isOrderInProcess());
Run Code Online (Sandbox Code Playgroud)
测试布尔值是否为假。执行我的用例测试布尔值是否为真。
现在,为了执行最后一个测试,我需要确保 API 在调用时什么都不做(execute 方法将调用改造 API。
为了尝试什么都不做,我在测试用例的开头使用了以下行。
doNothing().when(orderAPI.orderComplete(anyString(), any(OrderWrapper.class)));
Run Code Online (Sandbox Code Playgroud)
但是,我收到以下错误:
org.mockito.exceptions.misusing.UnfinishedStubbingException:
Unfinished stubbing detected here:
-> at com.tfds.xms.unit_test.SingleTest.TestAtomicBooleanLocked(SingleTest.java:90)
E.g. thenReturn() may be missing.
Examples of correct stubbing:
when(mock.isOk()).thenReturn(true);
when(mock.isOk()).thenThrow(exception);
doThrow(exception).when(mock).someVoidMethod();
Hints:
1. missing thenReturn()
2. you are trying to stub a final method, you naughty developer!
3: you are stubbing the behaviour of another mock inside before 'thenReturn' instruction …Run Code Online (Sandbox Code Playgroud) 我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。
例如。
observable1 = PublishSubject<>()
observable2 = PublishSubject<>()
observable1.onNext(1)
observable1.onNext(2)
observable2.onNext("Test")
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)
应该发出:
(1, null)
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)
也有可能observable2在之前被发射observable1
CombineLatest是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?
我正在使用Android 的Reactive Network库。我是 RxJava 的绝对初学者,我正在努力解决它。我想做的是:
1]持续观察手机网络连接状态的变化
2]如果手机已连接到网络,请检查一次是否有互联网连接
为此,我在 Kotlin 中有以下代码:
ReactiveNetwork.observeNetworkConnectivity(applicationContext)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { connectivity ->
if (connectivity.state == NetworkInfo.State.CONNECTED) {
ReactiveNetwork.checkInternetConnectivity()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { isConnectedToInternet ->
if (isConnectedToInternet) {
Log.d("VED-APP", "Connected to Internet")
} else {
Log.d("VED-APP", "Not Connected to Internet")
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
然而,这段代码很丑陋,而且非常嵌套。有没有办法清理这段代码?
尽管示例是 Kotlin 中的,但 Java 或 Kotlin 中的答案都会有所帮助。
LiveData似乎非常有用,因为它只在视图处于活动状态时通知视图。它还在订阅后立即存储并返回最后一个值给新订阅者。
我的问题是如何仅使用 RxJava 实现相同的功能?
由于 Rx 是一个功能齐全的反应式解决方案,将它与另一个反应式解决方案结合起来似乎不太对。我更喜欢是否可以从项目中删除 LiveData。
我知道https://github.com/trello/RxLifecycle和https://github.com/uber/AutoDispose但他们所做的是取消订阅流。我不想要那个。只要视图模型还活着,我就希望我的流存在,但我的活动会根据生命周期开始和停止收听 Steam。
任何建议将不胜感激
给定以下代码,onErrorResumeNext 方法只会用“hi”替换列表中的错误项。我将如何让它继续迭代列表的其余部分?
List<Object> list = new ArrayList<>();
list.add("one");
list.add(new Testo());
list.add("two");
list.add("three");
Observable.fromIterable(list)
.map(item -> item.toString())
.onErrorResumeNext(Observable.just("hi"))
.subscribe(item -> System.out.println(item), onError -> System.out.println("error"));
private static class Testo {
@Override
public String toString() {
throw new NullPointerException();
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试用 RxJava 编写我的第一个代码,但我想我遇到了一些库导入错误。
package second.pack;
import io.reactivex.Observable;
public class Main {
public static void main(String[] args) {
Observable <String> observable = Observable.create(
// the line below marked as it has an error in Eclipse
/* 7 line */ new Observable.OnSubscribe<String>(){
@Override
public void call (Subscriber <String> sub){
sub.onNext("New Datas");
sub.onComplete();
}
}
);
}
Run Code Online (Sandbox Code Playgroud)
}
错误
Exception in thread "main" java.lang.Error: Unresolved compilation problem:
Observable.OnSubscribe cannot be resolved to a type
at second.pack.Main.main(Main.java:7)
Run Code Online (Sandbox Code Playgroud)
请问有人能帮我解决这个错误吗?非常感谢!
我在从主题获取最后发出的值时遇到问题
这是我的班级,负责发射和观察电池变化:
class BatteryLevelProvider @Inject constructor(
app: App
) {
private val context: Context = app
private val receiver: PowerConnectionReceiver = PowerConnectionReceiver()
init {
initializeReceiver()
}
private fun initializeReceiver() {
IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intentFilter ->
context.registerReceiver(receiver, intentFilter)
}
}
companion object {
val batteryLevelSubject = PublishSubject.create<Int>()
}
fun observeBatteryLevel(): Observable<Int> = batteryLevelSubject.distinctUntilChanged()
fun getCurrentBatteryLevel(): Int {
Timber.d("getCurrentBatteryLevel: ENTERED")
val blockingLast = batteryLevelSubject.blockingLast(0)
Timber.d("getCurrentBatteryLevel: $blockingLast")
return blockingLast
}
inner class PowerConnectionReceiver : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
val …Run Code Online (Sandbox Code Playgroud)