标签: rx-java

Kotlin和RxJava - 为什么我的Single.zip()没有编译?

我在这里有点疯狂.我正在尝试创建一个Observable<BigDecimal>扩展函数(针对RxJava 2.x)来发出平均排放量,但是我得到了Single.zip()函数的编译错误.有没有人有任何想法我做错了什么?我试图明确我的所有类型,但是没有用......

import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal


fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }

//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
    Single.zip(it.sum().toSingle(), it.count()) {
        sum, count -> sum / BigDecimal.valueOf(count)
    }
}
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

kotlin rx-java

23
推荐指数
2
解决办法
1万
查看次数

从普通Java事件创建Observable

Observable从传统的Java事件模式创建Rx-Java的最佳方法是什么?也就是说,给定

class FooEvent { ... }

interface FooListener {
  void fooHappened(FooEvent arg);
}

class Bar {
  public void addFooListener(FooListener l);
  public void removeFooListener(FooListener l);
}
Run Code Online (Sandbox Code Playgroud)

我想实施

Observable<FooEvent> fooEvents(Bar bar);
Run Code Online (Sandbox Code Playgroud)

我想出的实现是:

Observable<FooEvent> fooEvents(Bar bar) {
  return Observable.create(new OnSubscribeFunc<FooEvent>() {
    public Subscription onSubscribe(Observer<? super FooEvent> obs) {
      FooListener l = new FooListener() {
        public void fooHappened(FooEvent arg) {
          obs.onNext(arg);
        }
      };

      bar.addFooListener(l);

      return new Subscription() {
        public void unsubscribe() {
          bar.removeFooListener(l);
        }
      };
    }
  }); 
}
Run Code Online (Sandbox Code Playgroud)

但是,我真的不喜欢它:

  1. 它非常冗长;

  2. 需要一个监听器Observer(理想情况下,如果没有观察者,则应该没有监听器,否则一个监听器).这可以通过将观察者计数保持为一个字段, …

java events event-handling rx-java

22
推荐指数
2
解决办法
6860
查看次数

rxjava在创建observable后添加项目

我刚开始使用rxjava而且卡住了.也许我没有以正确的方式使用rxjava,但我需要Observable在创建后添加项目.所以我理解你可以只是打电话Observable.just("Some", "Items"),订阅者会收到它们,但是如果我有一个异步任务,我需要在任务完成后再添加一些项目呢?我找不到类似的东西Observable.addItems("Some", "More", "Items")

asynchronous observable rx-java rx-android

22
推荐指数
1
解决办法
9456
查看次数

使用RxJava和Okhttp

我想在另一个线程(如IO线程)中使用okhttp请求一个url并进入ResponseAndroid主线程,但我不知道如何创建一个Observable.

java android rx-java okhttp

22
推荐指数
3
解决办法
1万
查看次数

为什么在Android上使用Retrofit的RxJava doOnError()不起作用,但是Subscriber onError会起作用

谁能解释我为什么这样的代码:

 networApi.getList()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> {
                throwable.getMessage();
            })
            .doOnNext(list -> {
                coursesView.populateRecyclerView(list);
                courseList = (List<Course>) courses;
            }).subscribe();
Run Code Online (Sandbox Code Playgroud)

如果没有互联网进入doOnError但是进一步抛出它以便应用程序关闭,但代码如下:

networkApi.getList()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<List<? extends Course>>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    e.getMessage();
                }

                @Override
                public void onNext(List<? extends Course> list) {
                    coursesView.populateRecyclerView(list);
                    courseList = (List<Course>) list;
                }
            });
Run Code Online (Sandbox Code Playgroud)

按照我的预期工作,这意味着当没有互联网连接时,它什么都不做.

android subscriber rx-java retrofit rx-android

22
推荐指数
1
解决办法
6044
查看次数

Rx Observable定期发射值

我必须定期轮询一些RESTful端点以刷新我的Android应用程序的数据.我还必须根据连接暂停和恢复它(如果手机处于离线状态,则无需尝试).我目前的解决方案是有效的,但它使用标准的Java ScheduledExecutorService来执行周期性任务,但我想留在Rx范例中.

这是我当前的代码,为简洁起见,部分内容被跳过.

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future …
Run Code Online (Sandbox Code Playgroud)

android rx-java

21
推荐指数
3
解决办法
2万
查看次数

RxJava和缓存数据

我还是RxJava的新手,我在Android应用程序中使用它.我已经阅读了关于这个主题的公吨,但仍然觉得我错过了一些东西.

我有以下场景:

我有数据存储在系统中,可通过各种服务连接(AIDL)访问,我需要从该系统中检索数据(可能会发生1-n次异步调用).Rx帮助我简化了这段代码.但是,整个过程往往需要几秒钟(超过5秒+),因此我需要缓存此数据以加速本机应用程序.

此时的要求是:

  1. 初始订阅时,缓存将为空,因此我们必须等待所需的加载时间.没什么大不了.之后,应该缓存数据.

  2. 后续加载应该从缓存中提取数据,但是应该重新加载数据并且磁盘缓存应该在幕后.

问题:我有两个Observable - A和B. A包含从本地服务中提取数据的嵌套Observable(这里有吨).B更简单.B只包含从磁盘缓存中提取数据的代码.

需要解决:a)返回缓存项(如果已缓存)并继续重新加载磁盘缓存.b)缓存为空,从系统加载数据,缓存并返回.后续调用将返回"a".

我有几个人推荐一些操作,如flatmap,merge甚至主题,但由于某种原因,我无法连接点.

我怎样才能做到这一点?

android caching aidl rx-java

21
推荐指数
1
解决办法
1万
查看次数

RxJava:如何使用zip操作符处理错误?

我正在使用RxJava和RxAndroid与Retrofit2.

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

在上面两个Observer上使用如下的zip操作符.

 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
            } 
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() {

        @Override
        public void onNext(ArrayList<TestData> testDataList) {

        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted" );
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError Throwable: …
Run Code Online (Sandbox Code Playgroud)

error-handling android rx-java rx-android retrofit2

21
推荐指数
1
解决办法
1万
查看次数

通过改造和rxjava 2.x处理空响应

当使用rxjava 1.xi用于返回Observable<Void>处理来自改造的空响应时:

@POST( "login" )
Observable<Void> getToken( @Header( "Authorization" ) String authorization,
                                       @Header( "username" ) String username,
                                       @Header( "password" ) String password );
Run Code Online (Sandbox Code Playgroud)

但是因为rxjava 2.x不会发出任何东西,Void是否有任何好的做法来处理那些空的响应?

android rx-java rx-java2

21
推荐指数
2
解决办法
7051
查看次数

RxJava和RxAndroid之间的区别?

为什么我们需要将RxAndroid与RxJava一起使用?它们与RxAndroid和RxJava的实际使用之间有什么功能差异?我找不到合适的答案.

android reactive-programming rx-java rx-android

21
推荐指数
2
解决办法
4763
查看次数