标签: rx-java

如何组成Observable以避免给定的嵌套和依赖回调?

这个博客中,他给出了这个(复制/粘贴以下代码)回调地狱的例子.但是,没有提到如何使用Reactive Extensions消除该问题.

所以这里F3取决于F1完成,F4和F5取决于F2完成.

  1. 想知道Rx中的功能等价物是什么.
  2. 如何在Rx中表示F1,F2,F3,F4和F5都应该异步拉出?

注意:我目前正试图绕过Rx,所以在问这个问题之前我没有尝试解决这个例子.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */ …
Run Code Online (Sandbox Code Playgroud)

java future rx-java

20
推荐指数
2
解决办法
1万
查看次数

RxJava + Retrofit长轮询

我的问题是我无法获得无限的流Retrofit.在我获得初始poll()请求的凭证后 - 我做了初始poll()请求.如果没有更改,则每个poll()请求在25秒内响应,如果有任何更改,则每个poll()请求更早 - 返回changed_data [].每个响应都包含timestamp下一个轮询请求所需的数据 - 我应该在每个poll()响应之后执行新的poll()请求.这是我的代码:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});
Run Code Online (Sandbox Code Playgroud)

我是RxJava的新手,也许我不明白,但我无法获得无限的流.我接到3个电话,然后是onNext和onComplete.

PS也许有更好的解决方案在Android上实现长轮询?

android long-polling rx-java retrofit

20
推荐指数
1
解决办法
2507
查看次数

在后台线程上处理Observable

我正在使用RxAndroid进行流操作.在我的实际用例中,我从服务器获取列表(使用Retrofit).我正在使用调度程序在后台线程上完成工作,并在Android UI(主)线程上获得最终发射.

这适用于网络调用,但是我意识到网络调用后我的运算符不使用后台线程,而是在主线程上调用.

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});
Run Code Online (Sandbox Code Playgroud)

如何确保所有操作都在后台线程上执行?

rx-java rx-android

20
推荐指数
1
解决办法
2万
查看次数

在RxJava中处理Completable,Single,Maybe和终止Observable的最佳实践

我从Android的角度来问这个问题,但这应该适用于RxJava.

作为最佳实践,应我的观点始终处分甚至短暂的Completable,Single,Maybe和终止Observable的Rx类型应该在短期内结束,但是当用户关闭该视图可以仍在执行?我知道当Rx链终止时,它会被丢弃,但这可能会在视图关闭后的某个时候发生.

例如,Single正在执行HTTP GET.调用将完成,但可能是在视图被破坏后,暂时阻止垃圾回收.

如果a CompositeDisposable用于Disposable长期观察收集这些s,我会认为应该注意clear()Disposable定期删除这些s以防止无限大小的增长CompositeDisposable

android dispose rx-java rx-java2

20
推荐指数
2
解决办法
9428
查看次数

使用rxjava全局处理网络异常进行改造

我试图在全局级别的应用程序中处理异常,因此改造会抛出一个错误,我会在一些特定的类中捕获它,并使用逻辑来处理这些错误.

我有一个界面

@POST("/token")
AuthToken refreshToken(@Field("grant_type") String grantType, @Field("refresh_token") String refreshToken);
Run Code Online (Sandbox Code Playgroud)

和可观察的

/**
 * Refreshes auth token
 *
 * @param refreshToken
 * @return
 */
public Observable<AuthToken> refreshToken(String refreshToken) {
    return Observable.create((Subscriber<? super AuthToken> subscriber) -> {
        try {
            subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken));
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}
Run Code Online (Sandbox Code Playgroud)

当我从服务器获得401(无效令牌或其他一些网络相关的错误)时,我想刷新令牌并重复其余的呼叫.是否有一种方法可以使用rxjava对所有其余的调用使用某种可观察的全局捕获此错误,处理它并重复抛出它的调用?

现在我使用subject来捕获.subscribe()这样的错误

private static BehaviorSubject errorEvent = BehaviorSubject.create();

public static BehaviorSubject<RetrofitError> getErrorEvent() {
    return errorEvent;
}
Run Code Online (Sandbox Code Playgroud)

并在一些电话中

getCurrentUser = userApi.getCurrentUser().observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    (user) -> {
                        this.user = user;
                    },
                    errorEvent::onNext
            );
Run Code Online (Sandbox Code Playgroud)

然后在我的主要活动中,我订阅该行为主题并解析错误 …

android rx-java retrofit

19
推荐指数
1
解决办法
6222
查看次数

RxJava:d​​oOnError('callback')和subscribe(*,'callback')中的回调有什么区别

在我的上一个项目中,我使用rxJava并且我意识到observable.doOnError('onErrorCallback').subscribe(action)observable.subscribe(action, 'onErrorCallback')以不同的方式运行.即使是从文档中我也不明白它们之间的区别是什么,以及何时应该使用第一个和第二个变体.

java reactive-programming rx-java

19
推荐指数
1
解决办法
3594
查看次数

将AsyncTask转换为RxAndroid

我有以下方法使用otto和发布对UI的响应AsyncTask.

private static void onGetLatestStoryCollectionSuccess(final StoryCollection storyCollection, final Bus bus) {
    new AsyncTask<Void, Void, Void>() {
        @Override
        protected Void doInBackground(Void... params) {
            bus.post(new LatestStoryCollectionResponse(storyCollection));
            return null;
        }
    }.execute();
}
Run Code Online (Sandbox Code Playgroud)

我需要帮助将其转换AsyncTaskRxJava使用RxAndroid库.

android android-asynctask rx-java rx-android

19
推荐指数
3
解决办法
7850
查看次数

即使在另一个线程上调用了subscribeOn(),Observable也会在主线程上运行

我在其中一项活动中遇到了一个奇怪的问题.当从拍摄照片/视频回来时,onActivityResult我正在显示一个对话框,让用户为摄像机命名.一旦用户按下OK,我就会发送onNext()一个主题,该主题具有复制文件的请求文件名(并显示进度对话框).

由于某种原因map(),即使我打电话,也总是在主线程上调用执行复制的函数subscribeOn(Schedulers.io()).

@Override
protected void onActivityResult(final int requestCode, int resultCode, Intent intent) {
    ...

    final PublishSubject<String> subject = PublishSubject.create();`

    mSubscription = subject
            .subscribeOn(Schedulers.io())
            .map(new Func1<String, String>() {
                @Override
                public String call(String fileName) {
                    Log.I.d(TAG,"map");
                    return doSomeIOHeavyFuncition();
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(final String fullPath) {
                    Log.d(TAG,"onNext");
                    doSomethingOnUI(fullPath);

                    subject.onCompleted();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    ...
                }
            }, new Action0() {
                @Override
                public void …
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-android

19
推荐指数
1
解决办法
1万
查看次数

在Retrofit + RxJava中链接请求

我有2个API,我想按顺序请求并将它们的数据存储在SQLite中.

首先,我想向API发出请求A并将其数据存储在SQL表中a.然后请求API B并将其数据存储在表中,b并将一些数据存储在表中a_b.存储的数据a_b仅来自请求B.

我怎么能用RxJava做到这一点.我在这里读到了关于使用flatMap的地方,就像这样

apiService.A()
    // store in DB here? How? maybe use map()?
    .flatMap(modelA -> {
        // or maybe store modelA in DB here?
        return apiService.B().map(modelB -> {
            storeInDB()l // store B here ?
            return modelB;
        });
    });
Run Code Online (Sandbox Code Playgroud)

如果我没有使用lambda函数,这看起来就像普通的嵌套调用一样难看.这是一个更好的方法吗?

android rx-java retrofit2

19
推荐指数
2
解决办法
1万
查看次数

在RxJava2中处理null

随着即将推出的RxJava2版本的一个重要变化是null不再被接受为流元素,即以下代码将抛出异常:Observable.just(null)

老实说,我对这种变化有着复杂的感觉,我的一部分理解它会强制执行干净的API,但是当这可能是一个问题时,我可以看到许多用例.

例如,在我的应用程序中,我有一个内存缓存:

@Nullable CacheItem findCacheItem(long id);
Run Code Online (Sandbox Code Playgroud)

CacheItem可能不存在于缓存中,因此方法可能返回null值.

它与Rx*一起使用的方式如下:

Observable<CacheItem> getStream(final long id) {
    return Observable.fromCallable(new Callable<CacheItem>() {
        @Override public CacheItem call() throws Exception {
            return findCacheItem(id);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

所以使用这种方法,我可能在我的流中得到null,这是完全有效的情况,所以它在接收端正确处理 - 假设UI改变其状态,如果项目不在缓存中:

Observable.just(user)
          .map(user -> user.getName())
          .map(name -> convertNameToId(name))
          .flatMap(id -> getStream(id))
          .map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
          .subscribe(
              userInfo -> {
                  if(userInfo != null) showUserInfo();
                  else showPrompt();
              }
          );
Run Code Online (Sandbox Code Playgroud)

使用RxJava2,我不再允许null在流中发布消息,因此我需要将我的CacheItem包装到其他类中,并使我的流生成该包装,或者进行相当大的体系结构更改.

将每个流元素包装成可空的对应元素对我来说并不合适.

我错过了一些基本的东西吗?

看起来像我这样的情况非常受欢迎,所以我很好奇在RxJava2中给出新的"无空"政策时,解决这个问题的建议策略是什么?

编辑 请参阅RxJava GitHub仓库中的后续对话

rx-java

19
推荐指数
1
解决办法
7649
查看次数