我试图在Android中使用RxJava实现异步任务.我尝试了以下代码,但它没有用.它在UI线程上执行.我使用的是以下版本的RxAndroid 0.24.0.
try {
Observable.just(someMethodWhichThrowsException())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> onMergeComplete());
} catch (IOException e) {
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
但是,以下对我来说是异步的.
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
someMethodWhichThrowsException();
} catch (IOException e) {
e.printStackTrace();
}
subscriber.onCompleted();
}
});
observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();
Run Code Online (Sandbox Code Playgroud)
我想了解以下内容:1.它们之间有什么区别?2.创建异步任务时的最佳做法是什么?
提前致谢.
我一直试图确定是否可以创建一个可观察的DialogFragment.基本上我希望能够:
DialogFragmentObservable可以订阅结果的rx (按下确定/取消,字符串输入,后台任务成功/失败等)到目前为止,我发现最接近的是ReactiveDialog,它曾经是RxAndroid的一部分,但是已经从v1.0.0中的RxAndroid中删除,作为简化RxAndroid的一部分.
虽然ReactiveDialog看起来确实符合我的前两个标准,但似乎没有处理配置更改.有两个问题需要考虑:
DialogFragment必须保持其Observable跨配置更改,因此它可以通知其状态的用户.我仍然是RxJava的新手,所以我仍然试图围绕你如何管理这样的事情.看起来它应该是可能的,但我觉得它需要一个静态或单独的Observable管理器,并且可能retainedInstance DialogFragments.
任何人对此有任何建议或最佳做法?
我正在使用Retrofit 2.0.0-beta2和RxJava 1.0.14.我以这种方式处理错误,因为我需要在doFinally中执行一些代码:
.onErrorResumeNext(Observable.empty());
但是当我得到带有错误的http响应时(例如401),我的应用程序崩溃,堆栈跟踪中没有我的类.什么也没有发生,如果使用Observable.never.这是完整的堆栈跟踪:
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:60)
at android.os.Handler.handleCallback (Handler.java:739)
at android.os.Handler.dispatchMessage (Handler.java:95)
at android.os.Looper.loop (Looper.java:135)
at android.app.ActivityThread.main (ActivityThread.java:5221)
at java.lang.reflect.Method.invoke (Unknown source)
at java.lang.reflect.Method.invoke (Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run (ZygoteInit.java:899)
at com.android.internal.os.ZygoteInit.main (ZygoteInit.java:694)
rx.exceptions.OnErrorNotImplementedException: Sequence contains no elements
at rx.Observable$27.onError (Observable.java:7535)
at rx.observers.SafeSubscriber._onError (SafeSubscriber.java:154)
at rx.observers.SafeSubscriber.onError (SafeSubscriber.java:111)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue (OperatorObserveOn.java:197)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call (OperatorObserveOn.java:170)
at rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
at android.os.Handler.handleCallback (Handler.java:739)
at …Run Code Online (Sandbox Code Playgroud) 我正在寻找一个可以的主题(或类似的东西):
BehaviorSubject 几乎可以完成这项工作,但它保留了最后观察到的项目.
UPDATE
基于已接受的答案,我为单个观察项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber) …Run Code Online (Sandbox Code Playgroud) 让我们假设控制器的这两种情况产生一些延迟的随机数:
1)Reactive Spring 5反应性应用:
@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
return generateRandomNumbers(10, 500);
}
/**
* Non-blocking randon number generator
* @param amount - # of numbers to generate
* @param delay - delay between each number generation in milliseconds
* @return
*/
public Flux<Double> generateRandomNumbers(int amount, int delay){
return Flux.range(1, amount)
.delayMillis(delay)
.map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)
2)传统的Spring MVC DeferredResult:
@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
DeferredResult<Double[]> dr = new DeferredResult<Double[]>();
CompletableFuture.supplyAsync(() -> {
return generateRandomNumbers(10, 500);
}).whenCompleteAsync((p1, p2) -> { …Run Code Online (Sandbox Code Playgroud) 当我知道在我的组件/类超出范围之前,observable肯定会完成(通过complete或error通知),我是否还需要取消订阅才能防止内存泄漏?换句话说,完成/错误编辑是否可以自动清理,所以我不必担心?
在rxjava 1中的Observable类中有一个from方法但在rxjava 2中找不到.如何在以下代码中替换rxjava 2中的from方法:
List<Integer> ints = new ArrayList<>();
for (int i=1; i<10; i++) {
ints.add(new Integer(i));
}
Observable.just(ints)
.flatMap(new Function<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> apply(List<Integer> ints) {
return Observable.from(ints);
}
})
Run Code Online (Sandbox Code Playgroud) 如果我写这样的事情,那么这两个操作和通知将是对当前线程...
Observable.fromCallable(() -> "Do Something")
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
如果我在后台线程这样的操作,那么这两个操作和通知将在后台线程...
Observable.fromCallable(() -> "Do Something")
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
如果我想在主线程上观察并在Android中执行后台我会做...
Observable.fromCallable(() -> "Do Something")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
但是如果我正在编写一个标准的Java程序,那么你想在主线程上观察到的状态是什么呢?
在我的Android应用程序中,我有一个处理用户交互的演示者,包含一种请求管理器,如果需要,还可以通过请求管理器向请求管理器发送用户输入.
请求管理器本身包含服务器API并使用此RxJava处理服务器请求.
我有一个代码,每次用户输入消息并向服务器显示响应时,它会向服务器发送请求:
private Observable<List<Answer>> sendRequest(String request) {
MyRequest request = new MyRequest();
request.setInput(request);
return Observable.fromCallable(() -> serverApi.process(request))
.doOnNext(myResponse -> {
// store some data
})
.map(MyResponse::getAnswers)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)
但是现在我需要有一种排队.用户可以在服务器响应之前发送新消息.应按顺序处理队列中的每条消息.即,在我们得到对第一条消息的响应后,将发送第二条消息,依此类推.
如果发生错误,则不应处理进一步的请求.
我还需要在RecyclerView中显示答案.
我不知道如何更改上面的代码来实现上述处理
我看到了一些问题.一方面,该队列可以由用户随时更新,另一方面,任何时候服务器发送响应消息应该从队列中删除.
也许有一个rxjava运算符或我错过的特殊方式.
我在这里看到了类似的答案,但是,"队列"是不变的. 使用RxJava和Retrofit进行N次连续api调用
我会非常感谢任何解决方案或链接
我正在寻找一种更好的方法来在使用RxJava时为空结果实现一个简单的Observable回退系统.我们的想法是,如果对一组数据的本地查询导致零项,则应该进行后备查询(可以是网络调用,或其他).目前,我的代码包含以下内容:
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
List<Object> results = queryLocalDatabase();
if (results.isEmpty()) {
subscriber.onError(new Throwable("Empty results"));
} else {
// Process results...
}
}
}).onErrorResumeNext(/* Fallback Observable goes here */);
Run Code Online (Sandbox Code Playgroud)
虽然这有效,但为空结果集抛出异常并没有多大意义.我注意到有条件运算符可用isEmpty,但是它似乎并不像我想要的那样.例如,使用isEmpty......
localObservable = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
List<Object> results = queryLocalDatabase();
for (Object obj : results) {
// Process object
subscriber.onNext(object);
}
subscriber.onCompleted();
}
});
localObservable.isEmpty()
.switchMap(new Func1<Boolean, Observable<? extends Object>>() {
@Override …Run Code Online (Sandbox Code Playgroud)