标签: rx-java

什么时候应该使用RxJava Observable和Android上的简单回调?

我正在为我的应用程序开发网络.所以我决定尝试Square的Retrofit.我看到他们支持简单Callback

@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);
Run Code Online (Sandbox Code Playgroud)

和RxJava的 Observable

@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);
Run Code Online (Sandbox Code Playgroud)

乍一看两者看起来都非常相似,但是当它实现时它变得有趣......

虽然简单的回调实现看起来类似于:

api.getUserPhoto(photoId, new Callback<Photo>() {
    @Override
    public void onSuccess() {
    }
});
Run Code Online (Sandbox Code Playgroud)

这非常简单明了.随着Observable它迅速变得冗长和复杂.

public Observable<Photo> getUserPhoto(final int photoId) {
    return Observable.create(new Observable.OnSubscribeFunc<Photo>() {
        @Override
        public Subscription onSubscribe(Observer<? super Photo> observer) {
            try {
                observer.onNext(api.getUserPhoto(photoId));
                observer.onCompleted();
            } catch (Exception e) {
                observer.onError(e);
            }

            return Subscriptions.empty();
        }
    }).subscribeOn(Schedulers.threadPoolForIO());
}
Run Code Online (Sandbox Code Playgroud)

那不是它.你仍然需要做这样的事情:

Observable.from(photoIdArray)
        .mapMany(new Func1<String, Observable<Photo>>() {
            @Override
            public Observable<Photo> call(Integer s) { …
Run Code Online (Sandbox Code Playgroud)

android rx-java retrofit

251
推荐指数
4
解决办法
8万
查看次数

rxJava调度程序用例

在RxJava中有5种不同的调度程序可供选择:

  1. immediate():创建并返回一个在当前线程上立即执行工作的Scheduler.

  2. trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队.

  3. newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread.

  4. computation():创建并返回用于计算工作的Scheduler.这可以用于事件循环,处理回调和其他计算工作.不要在此调度程序上执行IO绑定的工作.使用调度程序.io()代替.

  5. io():创建并返回一个用于IO绑定工作的Scheduler.该实现由Executor线程池支持,该线程池将根据需要增长.这可用于异步执行阻塞IO.不要在此调度程序上执行计算工作.使用调度程序.计算()而不是.

问题:

前3个调度程序非常自我解释; 但是,我对计算io有点困惑.

  1. 究竟什么是"IO限制工作"?它用于处理streams(java.io)和files(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST API?
  2. 如何计算()从不同newThread() ?是每次所有的calculate()调用都在单个(后台)线程而不是新的(后台)线程上吗?
  3. 为什么在进行IO工作时调用calculate()会很糟糕?
  4. 为什么在进行计算工作时调用io()会很糟糕?

java multithreading thread-safety rx-java rx-android

245
推荐指数
2
解决办法
2万
查看次数

CompletableFuture,Future和RxJava的Observable之间的区别

我想知道的区别 CompletableFuture,FutureObservable RxJava.

我所知道的都是异步但是

Future.get() 阻止线程

CompletableFuture 给出了回调方法

RxJava Observable--- CompletableFuture与其他好处相似(不确定)

例如:如果客户端需要进行多次服务调用,那么当我们使用Futures(Java)时Future.get()将按顺序执行...想知道它在RxJava中的表现如何...

文档http://reactivex.io/intro.html

很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同).当然,这可以完成,但它很快变得复杂(因此容易出错)或者过早地阻塞Future.get(),这消除了异步执行的好处.

真的很想知道如何RxJava解决这个问题.我发现从文档中很难理解.

java multithreading asynchronous java-8 rx-java

173
推荐指数
4
解决办法
5万
查看次数

你什么时候在RxJava中使用map vs flatMap?

你什么时候在RxJava中使用map vs flatMap?

比方说,我们想将包含JSON的文件映射到包含JSON的字符串中 -

使用map,我们必须以某种方式处理Exception.但是如何?:

Observable.from(jsonFile).map(new Func1<File, String>() {
    @Override public String call(File file) {
        try {
            return new Gson().toJson(new FileReader(file), Object.class);
        } catch (FileNotFoundException e) {
            // So Exception. What to do ?
        }
        return null; // Not good :(
    }
});
Run Code Online (Sandbox Code Playgroud)

使用flatMap,它更加冗长,但我们可以将问题转发到Observables链中,如果我们选择其他地方甚至重试,则可以处理错误:

Observable.from(jsonFile).flatMap(new Func1<File, Observable<String>>() {
    @Override public Observable<String> call(final File file) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(Subscriber<? super String> subscriber) {
                try {
                    String json = new Gson().toJson(new FileReader(file), Object.class);

                    subscriber.onNext(json);
                    subscriber.onCompleted();
                } …
Run Code Online (Sandbox Code Playgroud)

java mapping rx-java flatmap

172
推荐指数
8
解决办法
10万
查看次数

RxJava中flatmap和switchmap有什么区别?

rxjava DOC switchmap的定义是相当含糊,并将其链接到同一页作为flatmap.这两个运营商有什么区别?

reactive-programming rx-java

139
推荐指数
5
解决办法
5万
查看次数

Java 8流和RxJava可观察量之间的差异

Java 8流是否类似于RxJava observables?

Java 8流定义:

java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.

observable java-8 rx-java java-stream

137
推荐指数
5
解决办法
4万
查看次数

Observable vs Flowable rxJava2

我一直在寻找新的rx java 2,我不太确定我理解了这个想法backpressure......

我知道我们有Observable没有backpressure支持,Flowable而且有它.

因此,基于例如,可以说我有flowableinterval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });
Run Code Online (Sandbox Code Playgroud)

这将在大约128个值之后崩溃,这很明显我消耗的速度比获取项目慢.

但后来我们也一样 Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });
Run Code Online (Sandbox Code Playgroud)

这根本不会崩溃,即使我延迟消耗它仍然有效.为了让Flowable工作让我说我放onBackpressureDrop操作员,崩溃已经消失,但并非所有值都被发出.

所以我目前无法找到答案的基本问题是我为什么要关心backpressure何时可以使用普通Observable仍然可以在不管理的情况下获得所有值buffer?或者从另一方面来看,有什么优势backpressure可以帮助我管理和处理消费?

java android rx-java

115
推荐指数
3
解决办法
4万
查看次数

使用Retrofit 2.0和RxJava获取响应状态代码

我正在尝试升级到Retrofit 2.0并在我的android项目中添加RxJava.我正在进行api调用,并希望在服务器发出错误响应的情况下检索错误代码.

Observable<MyResponseObject> apiCall(@Body body);
Run Code Online (Sandbox Code Playgroud)

并在RxJava调用中:

myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(MyResponseObject myResponseObject) {
           //On response from server
        }
    });
Run Code Online (Sandbox Code Playgroud)

在Retrofit 1.9中,RetrofitError仍然存在,我们可以通过以下方式获得状态:

error.getResponse().getStatus()
Run Code Online (Sandbox Code Playgroud)

如何使用RxJava进行Retrofit 2.0?

android android-networking rx-java retrofit rx-android

101
推荐指数
3
解决办法
5万
查看次数

RxJava中Observable,Completable和Single之间有什么区别

任何人都可以用明确的例子解释RxJava中Observable,Completable和Single之间的区别吗?

在哪种情况下我们使用其中一个?

rx-java rx-java2

95
推荐指数
2
解决办法
2万
查看次数

rxjava:我可以使用retry()但是有延迟吗?

我在我的Android应用程序中使用rxjava来异步处理网络请求.现在我想在一段时间过后才重试失败的网络请求.

有没有办法在Observable上使用retry()但只能在一定延迟后重试?

有没有办法让Observable知道当前正在重试(而不是第一次尝试)?

我看了一下debounce()/ throttleWithTimeout(),但他们似乎做了不同的事情.

编辑:

我想我找到了一种方法,但是我会对确认这是正确的方法或其他更好的方法感兴趣.

我正在做的是:在我的Observable.OnSubscribe的call()方法中,在我调用Subscribers onError()方法之前,我只是让Thread睡眠所需的时间.所以,要每1000毫秒重试一次,我会这样做:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

由于此方法无论如何都在IO线程上运行,因此它不会阻止UI.我能看到的唯一问题是,即使是第一个错误也会报告延迟,所以即使没有重试(),也会出现延迟.如果延迟没有错误之后应用,而是重试之前(但不是在第一次尝试之前,显然),我会更喜欢它.

rx-java

86
推荐指数
8
解决办法
4万
查看次数