标签: rx-java

Android中的RxJava异步任务

我试图在Android中使用RxJava实现异步任务.我尝试了以下代码,但它没有用.它在UI线程上执行.我使用的是以下版本的RxAndroid 0.24.0.

try {
            Observable.just(someMethodWhichThrowsException())
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(s -> onMergeComplete());
        } catch (IOException e) {
            e.printStackTrace();
        }
Run Code Online (Sandbox Code Playgroud)

但是,以下对我来说是异步的.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    someMethodWhichThrowsException();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                subscriber.onCompleted();
            }
        });
        observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();
Run Code Online (Sandbox Code Playgroud)

我想了解以下内容:1.它们之间有什么区别?2.创建异步任务时的最佳做法是什么?

提前致谢.

android android-asynctask rx-java

14
推荐指数
2
解决办法
1万
查看次数

如何使用RxJava管理DialogFragment?

我一直试图确定是否可以创建一个可观察的DialogFragment.基本上我希望能够:

  • 创建并显示一个 DialogFragment
  • 获取Observable可以订阅结果的rx (按下确定/取消,字符串输入,后台任务成功/失败等)
  • 正确处理配置更改

到目前为止,我发现最接近的是ReactiveDialog,它曾经是RxAndroid的一部分,但是已经从v1.0.0中的RxAndroid中删除,作为简化RxAndroid的一部分.

虽然ReactiveDialog看起来确实符合我的前两个标准,但似乎没有处理配置更改.有两个问题需要考虑:

  1. DialogFragment必须保持其Observable跨配置更改,因此它可以通知其状态的用户.
  2. 订户必须能够在配置更改后保持订阅或重新订阅(显然不会产生内存泄漏).

我仍然是RxJava的新手,所以我仍然试图围绕你如何管理这样的事情.看起来它应该是可能的,但我觉得它需要一个静态或单独的Observable管理器,并且可能retainedInstance DialogFragments.

任何人对此有任何建议或最佳做法?

java android android-dialogfragment rx-java

14
推荐指数
1
解决办法
1559
查看次数

Observable.empty()导致java.util.NoSuchElementException:Sequence不包含任何元素

我正在使用Retrofit 2.0.0-beta2和RxJava 1.0.14.我以这种方式处理错误,因为我需要在doFinally中执行一些代码:

.onErrorResumeNext(Observable.empty());

但是当我得到带有错误的http响应时(例如401),我的应用程序崩溃,堆栈跟踪中没有我的类.什么也没有发生,如果使用Observable.never.这是完整的堆栈跟踪:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:60)
at android.os.Handler.handleCallback (Handler.java:739)
at android.os.Handler.dispatchMessage (Handler.java:95)
at android.os.Looper.loop (Looper.java:135)
at android.app.ActivityThread.main (ActivityThread.java:5221)
at java.lang.reflect.Method.invoke (Unknown source)
at java.lang.reflect.Method.invoke (Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run (ZygoteInit.java:899)
at com.android.internal.os.ZygoteInit.main (ZygoteInit.java:694)

rx.exceptions.OnErrorNotImplementedException: Sequence contains no elements
at rx.Observable$27.onError (Observable.java:7535)
at rx.observers.SafeSubscriber._onError (SafeSubscriber.java:154)
at rx.observers.SafeSubscriber.onError (SafeSubscriber.java:111)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue (OperatorObserveOn.java:197)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call (OperatorObserveOn.java:170)
at rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
at android.os.Handler.handleCallback (Handler.java:739)
at …
Run Code Online (Sandbox Code Playgroud)

java android nosuchelementexception rx-java retrofit

14
推荐指数
1
解决办法
9264
查看次数

像RxJava中的Subject一样排队

我正在寻找一个可以的主题(或类似的东西):

  1. 如果没有订阅者,可以接收项目并将其保留在队列或缓冲区中
  2. 一旦我们有订户,所有物品都被消耗掉并且再也不会被发射
  3. 我可以订阅/取消订阅主题

BehaviorSubject 几乎可以完成这项工作,但它保留了最后观察到的项目.

UPDATE

基于已接受的答案,我为单个观察项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber) …
Run Code Online (Sandbox Code Playgroud)

rx-java

14
推荐指数
1
解决办法
5407
查看次数

Web反应式编程 - 从HTTP客户端的角度来看,有哪些优势?

让我们假设控制器的这两种情况产生一些延迟的随机数:

1)Reactive Spring 5反应性应用:

@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
    return generateRandomNumbers(10, 500);
}

/**
 * Non-blocking randon number generator
 * @param amount - # of numbers to generate
 * @param delay - delay between each number generation in milliseconds
 * @return
 */
public Flux<Double> generateRandomNumbers(int amount, int delay){
    return Flux.range(1, amount)
               .delayMillis(delay)
               .map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)

2)传统的Spring MVC DeferredResult:

@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
    DeferredResult<Double[]> dr = new DeferredResult<Double[]>();

    CompletableFuture.supplyAsync(() -> {
        return generateRandomNumbers(10, 500);
    }).whenCompleteAsync((p1, p2) -> { …
Run Code Online (Sandbox Code Playgroud)

spring reactive-programming rx-java project-reactor

14
推荐指数
1
解决办法
2248
查看次数

我们是否需要取消订阅完成/错误输出的可观察性?

当我知道我的组件/类超出范围之前,observable肯定会完成(通过completeerror通知),我是否还需要取消订阅才能防止内存泄漏?换句话说,完成/错误编辑是否可以自动清理,所以我不必担心?

system.reactive rxjs rx-java angular

14
推荐指数
1
解决办法
1352
查看次数

无法在rxjava 2中解析方法Observable.from

在rxjava 1中的Observable类中有一个from方法但在rxjava 2中找不到.如何在以下代码中替换rxjava 2中的from方法:

    List<Integer> ints = new ArrayList<>();
    for (int i=1; i<10; i++) {
        ints.add(new Integer(i));
    }
    Observable.just(ints)
            .flatMap(new Function<List<Integer>, Observable<Integer>>() {
                @Override
                public Observable<Integer> apply(List<Integer> ints) {
                    return Observable.from(ints);
                }
            })
Run Code Online (Sandbox Code Playgroud)

rx-java rx-java2

14
推荐指数
3
解决办法
6665
查看次数

RxJava Scheduler在主线程上观察

如果我写这样的事情,那么这两个操作和通知将是对当前线程...

Observable.fromCallable(() -> "Do Something")
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

如果我在后台线程这样的操作,那么这两个操作和通知将在后台线程...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

如果我想在主线程上观察并在Android中执行后台我会做...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但是如果我正在编写一个标准的Java程序,那么你想在主线程上观察到的状态是什么呢?

java rx-java rx-java2

14
推荐指数
2
解决办法
5889
查看次数

RxJava.顺序执行

在我的Android应用程序中,我有一个处理用户交互的演示者,包含一种请求管理器,如果需要,还可以通过请求管理器向请求管理器发送用户输入.

请求管理器本身包含服务器API并使用此RxJava处理服务器请求.

我有一个代码,每次用户输入消息并向服务器显示响应时,它会向服务器发送请求:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

但是现在我需要有一种排队.用户可以在服务器响应之前发送新消息.应按顺序处理队列中的每条消息.即,在我们得到对第一条消息的响应后,将发送第二条消息,依此类推.

如果发生错误,则不应处理进一步的请求.

我还需要在RecyclerView中显示答案.

我不知道如何更改上面的代码来实现上述处理

我看到了一些问题.一方面,该队列可以由用户随时更新,另一方面,任何时候服务器发送响应消息应该从队列中删除.

也许有一个rxjava运算符或我错过的特殊方式.

我在这里看到了类似的答案,但是,"队列"是不变的. 使用RxJava和Retrofit进行N次连续api调用

我会非常感谢任何解决方案或链接

android rx-java rx-java2

14
推荐指数
2
解决办法
4484
查看次数

RxJava的后备可观察

我正在寻找一种更好的方法来在使用RxJava时为空结果实现一个简单的Observable回退系统.我们的想法是,如果对一组数据的本地查询导致零项,则应该进行后备查询(可以是网络调用,或其他).目前,我的代码包含以下内容:

Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     if (results.isEmpty()) {
       subscriber.onError(new Throwable("Empty results"));
     } else {
       // Process results...
     }
  }
}).onErrorResumeNext(/* Fallback Observable goes here */);
Run Code Online (Sandbox Code Playgroud)

虽然这有效,但为空结果集抛出异常并没有多大意义.我注意到有条件运算符可用isEmpty,但是它似乎并不像我想要的那样.例如,使用isEmpty......

localObservable = Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     for (Object obj : results) {
       // Process object
       subscriber.onNext(object);           
     }
     subscriber.onCompleted();
  }
});

localObservable.isEmpty()
  .switchMap(new Func1<Boolean, Observable<? extends Object>>() {
    @Override …
Run Code Online (Sandbox Code Playgroud)

rx-java

13
推荐指数
1
解决办法
6457
查看次数