标签: rx-java

RX:并行运行Zipped Observables?

所以我正在玩RX(非常酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回observables.

所以我开始尝试解决的问题之一就是"如果我想要进行3次API调用,得到结果,然后在完成后再进行一些处理,该怎么办?"

我花了一个小时或两个小时,但我最终找到了Zip功能,它帮助我轻松地:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new …
Run Code Online (Sandbox Code Playgroud)

java system.reactive rx-java

15
推荐指数
2
解决办法
9728
查看次数

使用Retrofit rxjava concatWith时堆栈溢出

我想使用rxjava Observable处理Retrofit中的分页.我听从了另一个问题的建议.

我有超过100个页面需要被提取,但链在第20页的页面失败并停止对logserv的进一步订阅以及logcat中的以下日志

04-04 04:12:11.766    2951-3012/com.example.app I/dalvikvm? threadid=28: stack overflow on call to Ljava/util/concurrent/atomic/AtomicLongFieldUpdater$CASUpdater;.compareAndSet:ZLJJ
04-04 04:12:11.766    2951-3012/com.example.app I/dalvikvm? method requires 56+20+32=108 bytes, fp is 0x94b52350 (80 left)
04-04 04:12:11.766    2951-3012/com.example.app I/dalvikvm? expanding stack end (0x94b52300 to 0x94b52000)
04-04 04:12:11.766    2951-3012/com.example.app I/dalvikvm? Shrank stack (to 0x94b52300, curFrame is 0x94b548dc)
Run Code Online (Sandbox Code Playgroud)

有人知道为什么会这样吗?

更新:我知道这是由于递归而发生的,但是有一种更优雅的方式来处理改装和rxjava的分页吗?

java android rx-java retrofit rx-android

15
推荐指数
1
解决办法
3875
查看次数

RxJava-在Observable链中执行peek()或void操作?

Java 8 lambda流有一个peek()运算符,允许您对每个项执行void操作.这通常用于调试,但它也是一种欺骗和启动void操作而不映射到某些东西的好方法.

在RxJava中是否有相同的功能?也许我没有遵循良好的做法或反应性思考......但是在操作之前和之后创建状态标签会非常方便吗?如果peek()不受支持,是否有更好的模式可供遵循?

Observable<Item> Item= ...;

Label statusLabel = new Label();
Label resultLabel = new Label();

Observable<CalculatedItem> calculatedItem = calculated.subscribeOn(Schedulers.computation())
.peek(c -> statusLabel.setText("Working.."))
.map(c -> performExpensiveCalculation(c))
.peek(r -> statusLabel.setText(""));

calculatedItem.subscribe(c -> resultLabel.setText(c));
Run Code Online (Sandbox Code Playgroud)

java reactive-programming rx-java

15
推荐指数
1
解决办法
3725
查看次数

RxJava Observable.cache无效

我想在Android环境中学习rxjava.假设我有一个可观察到的网络调用结果.如果我理解正确,处理配置更改的一种常见方法是:

  • 将observable存储在保留的片段/单例/应用程序对象中

  • 缓存运算符应用于observable

  • 订阅/取消订阅适当的生命周期处理程序

这样做,我们不会松开一旦新配置发生后将重新观察到的observable的结果.

现在,我的问题是:

有没有办法强制observable发出一个新值(并使缓存的值无效)?每次我需要来自网络的新数据时,我是否需要创建一个新的observable(这听起来不像Android世界中的坏习惯,因为这会让gc做额外的工作)?

非常感谢,

费德里科

android rx-java

15
推荐指数
1
解决办法
3998
查看次数

Schedulers.io()和Schedulers.computation()之间的区别是什么

在沙发基地,Observables

有什么区别:Schedulers.io()和Schedulers.computation()

observable couchbase rx-java

15
推荐指数
2
解决办法
6954
查看次数

io.reactivex.exceptions.UndeliverableException 异常无法传递给消费者,因为它已经被取消/处置

使用时出现 UndeliverableExceptioncompletable

public Completable createBucketWithStorageClassAndLocation() {
        return Completable.complete()
                .doFinally(() -> {
            Bucket bucket =
                    storage.create(
                            BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
                                    .setStorageClass(storageClass)
                                    .setLocation(googleUploadObjectConfiguration.locationName())
                                    .build());       
        }).doOnError(error -> LOG.error(error.getMessage()));
    }
Run Code Online (Sandbox Code Playgroud)

异常是从 Google 存储中抛出的,这是正确的,但尝试处理doOnError方法

Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
Run Code Online (Sandbox Code Playgroud)

RXJava 异常

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. …
Run Code Online (Sandbox Code Playgroud)

java rx-java reactivex rx-java2 rx-java3

15
推荐指数
2
解决办法
3万
查看次数

在文本更改事件之前自动调用RxAndroid textview事件

我在编辑文本搜索中使用rxandroid进行去抖操作

我用了

private void setUpText() {
        _mSubscription = RxTextView.textChangeEvents(searchStation)//
                .debounce(500, TimeUnit.MILLISECONDS)// default Scheduler is Computation
                .observeOn(AndroidSchedulers.mainThread())//
                .subscribe(getOps().getdata());
    }
Run Code Online (Sandbox Code Playgroud)

和观察者一样

public Observer<TextViewTextChangeEvent> getdata()
    {

        return new Observer<TextViewTextChangeEvent>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                 e.printStackTrace();
            }

            @Override
            public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
//                stationsugession(onTextChangeEvent.text().toString());

                //here i called link to get the data from the server
            }
        };
    }
Run Code Online (Sandbox Code Playgroud)

我的问题是甚至在任何edittext更改发生之前调用链接.它没有调用textchange事件.我错过了什么我在这里做错了什么.我是rxandroid和rxjava的新手.

我用了

  compile 'io.reactivex:rxandroid:1.1.0'
    compile 'io.reactivex:rxjava:1.1.0'
    compile 'com.jakewharton.rxbinding:rxbinding:0.2.0'
Run Code Online (Sandbox Code Playgroud)

编辑:

它现在可以工作,我在我的逻辑中得到一个空指针获取列表..当我使用onError方法并放置一个堆栈跟踪我发现问题.

如果你想跳过初始调用,那么我们应该将.skip(1)调用 到你的订阅对象.[感谢Daniel Lew ] …

android rx-java rx-android

14
推荐指数
1
解决办法
8684
查看次数

Android Retrofit 2 + RxJava:听听层出不穷

我可以使用Retrofit + RxJava来聆听无尽的流吗?例如Twitter流.我有的是这个:

public interface MeetupAPI {
    @GET("http://stream.meetup.com/2/rsvps/")
    Observable<RSVP> getRSVPs();
}

MeetupAPI api = new Retrofit.Builder()
            .baseUrl(MeetupAPI.RSVP_API)
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build()
            .create(MeetupAPI.class);

api.getRSVPs()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(rsvp -> Log.d(TAG, "got rsvp"),
                error -> Log.d(TAG, "error: " + error),
                () -> Log.d(TAG, "onComplete"));
Run Code Online (Sandbox Code Playgroud)

但在解析第一个对象后调用"onComplete".有没有办法告诉Retrofit保持开放直到另行通知?

android rx-java retrofit retrofit2

14
推荐指数
1
解决办法
4203
查看次数

获取Observable的最新值并立即发出

我正在尝试获取给定的最新值,Observable并在调用后立即将其发出.以下面的代码为例:

return Observable.just(myObservable.last())
    .flatMap(myObservable1 -> {
        return myObservable1;
    })
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
Run Code Online (Sandbox Code Playgroud)

这不起作用,因为通过这样做,flatMap将发射myObservable1,然后必须发射,以达到map.我不知道是否可以做这样的事情.有没有人知道如何实现这一目标?谢谢

java reactive-programming rx-java rx-android

14
推荐指数
1
解决办法
4万
查看次数

如何使用RxJava Interval Operator

我正在学习RxJava运算符,我发现下面这些代码没有打印任何东西:

public static void main(String[] args) {

    Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long l) {
            System.out.println("onNext -> " + l);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

作为ReactiveX, interval

创建一个Observable,它发出一个由特定时间间隔隔开的整数序列

我犯了错误还是忘了什么?

rx-java

14
推荐指数
2
解决办法
1万
查看次数