标签: rx-java2

RxJava2 - Mutlicasting单曲,Maybes和Completables?

组播Singles,Maybes和Completables的推荐方法是什么?是否只是建议将它们变成标准的Observable来组播它们?

    Observable<String> strings =
            Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");

    Observable<Integer> lengths = strings.map(String::length);

    ConnectableObservable<List<Integer>> lengthsList = 
            lengths.toList().toObservable().publish();
Run Code Online (Sandbox Code Playgroud)

我有点惊讶,没有ConnectableSingle,ConnectableMaybeConnectableCompletable.这背后的原因是什么?开发的努力是不值得的?

java reactive-programming rx-java rx-java2

5
推荐指数
1
解决办法
525
查看次数

RxJava1 vs Rxjava2:onNext中的异常

在RxJava1中执行以下操作时,onNext中的异常将重新路由到同一订户的onError中:

    Observable.from(Arrays.asList("1", "22", "333", "4444")).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
            Log.d("RxJava1", "onError: " + e.getCause());
        }

        @Override
        public void onNext(String s) {
            if (s.length() == 4) {
                Integer test = null;
                test.hashCode();
            }
            Log.d("RxJava1", s + " - " + s.length());
        }
    });
Run Code Online (Sandbox Code Playgroud)

输出:

D/RxJava1: 1 - 1
D/RxJava1: 22 - 2
D/RxJava1: 333 - 3
D/RxJava1: onError: null
Run Code Online (Sandbox Code Playgroud)

据我所知,在RxJava2中这样做时,此行为已更改,不再返回onError,而只是崩溃:

    Observable.fromIterable(Arrays.asList("1", "22", "333", "4444")).subscribeWith(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) …
Run Code Online (Sandbox Code Playgroud)

rx-java rx-java2

5
推荐指数
1
解决办法
412
查看次数

如何在RxJava 2中发生错误后继续处理?

我有一个PublishSubject和一个Subscriber我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()
Run Code Online (Sandbox Code Playgroud)

我的问题是没有任何错误处理方法可以帮助我:

onErrorResumeNext() - 指示Observable在遇到错误时发出一系列项目

onErrorReturn(?) - 指示Observable在遇到错误时发出特定项目

onExceptionResumeNext(?) - 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)

retry(?) - 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错

retryWhen(?) - 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源

我尝试retry()了例子,但它无限期地在错误之后挂起我的进程.

我也试过,onErrorResumeNext()但它没有按预期工作:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar") …
Run Code Online (Sandbox Code Playgroud)

java kotlin rx-java2

5
推荐指数
1
解决办法
2522
查看次数

如何在RxJava2中使用带有lambda表达式的DisposableObserver

我的用例是想在我的onNext中的某个条件之后处理.所以尝试使用DisposableObserver.这是有效的代码

Observable.just(1, 2, 3, 4)
    .subscribe(new DisposableObserver<Integer>() {
                     @Override
                     public void onNext(Integer integer) {
                       System.out.println("onNext() received: " + integer);
                       if (integer == 2) {
                         dispose();
                       }
                     }
                     @Override
                     public void onError(Throwable e) { System.out.println("onError()"); }
                     @Override
                     public void onComplete() { System.out.println("onComplete()"); }
                   }
    );
Run Code Online (Sandbox Code Playgroud)

现在,如果你尝试用lambda替换它,它会将lambda视为

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
Run Code Online (Sandbox Code Playgroud)

现在这样做.通过从onSubscribe保存一次性用品,然后调用disposable.dispose(); 来自onNext.

  private Disposable disposable;
  private void disposableObserverTest() {
    Observable.just(1, 2, 3, 4)
        .subscribe(integer -> {
              System.out.println("onNext() received: " + integer);
              if (integer == 2) { …
Run Code Online (Sandbox Code Playgroud)

lambda android rx-java rx-android rx-java2

5
推荐指数
1
解决办法
1348
查看次数

rxJava debounce运算符不使用Observable.range()

AFAIK debounce()是rxJava 的运营商,用于延迟事件的发射.当我在搜索框中应用它时,它正常工作:

RxTextView.textChangeEvents(editText)
                .debounce(1000, TimeUnit.MILLISECONDS) //Only emit after 1 sec
                .subscribe(new Observer<TextViewTextChangeEvent>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(TextViewTextChangeEvent event) {
                        //Get more information about text change event
                        Log.e(TAG, "Before: " + event.before() + ", start: " + event.start() + ", count: " + event.count());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "textChangeEvents: onComplete");
                    }
                });
Run Code Online (Sandbox Code Playgroud)

但是当我Observable.range()像这样应用它时:

Observable.range(1, 10000)
                .debounce(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Observer<Long>() { …
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-android rx-java2

5
推荐指数
1
解决办法
1028
查看次数

Rxjava2,Java 8 Streams,Plain Old Iteration之间的性能比较

我已经成为Java 8中Java函数编程以及Rx java的忠实粉丝.但是一位同事最近指出,使用这些产品会影响性能.因此决定运行JMH Bench标记,但看起来他是对的.无论我做什么,我都无法获得流版本以提供更好的性能.以下是我的代码

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream().parallel()
                .mapToInt(Integer::intValue)
                .filter(i …
Run Code Online (Sandbox Code Playgroud)

performance java-8 java-stream rx-java2

5
推荐指数
1
解决办法
812
查看次数

致命异常:java.io.InterruptedIOException:与rxjava一起使用改造时线程中断

我收到此随机异常,导致在进行翻新的网络通话时崩溃了我的应用程序,并且正在寻找解决方法的指导

Fatal Exception: java.io.InterruptedIOException: thread interrupted
       at okio.Timeout.throwIfReached(Timeout.java:145)
       at okio.Okio$1.write(Okio.java:76)
       at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
       at okio.RealBufferedSink.flush(RealBufferedSink.java:216)
       at okhttp3.internal.http2.Http2Writer.flush(Http2Writer.java:121)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:239)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:205)
       at okhttp3.internal.http2.Http2Codec.writeRequestHeaders(Http2Codec.java:111)
       at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:50)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.happy.happyapp.util.Network$1.intercept(Network.java:80)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$CustomHttpMetricsLogger.intercept(RetrofitFactory.java:139)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$1.intercept(RetrofitFactory.java:83)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:212)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
       at okhttp3.RealCall.execute(RealCall.java:77)
       at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41) …
Run Code Online (Sandbox Code Playgroud)

android retrofit2 okhttp3 rx-java2

5
推荐指数
1
解决办法
3068
查看次数

RxJava:并行执行Single列表,并以相同的顺序将结果放入列表中

我有一个网络调用列表(他们返回一个Single<Item>),我想并行执行它们.一旦所有的呼叫成功,我想得到一个List<Item>,按照与之相同的顺序List<Single<Item>>.

我已经设法做到并且它有效,但它似乎有点太复杂,我猜我必须有一个更简单的方法,因为这似乎是一个非常常见的用例?

这是我做的:

    List<Single<Pair<Item, Integer>>> itemSingles = new ArrayList<>();
    for (int index = 0; index < itemCount - 1; index++) {
        int finalIndex = index;
        itemSingles.add(
                fetchItem(...)
                .map(item -> new Pair<>(item, finalIndex))
        );
    }

    Single.merge(itemSingles)
            .sorted((o1, o2) -> o1.second.compareTo(o2.second))
            .map(itemPair -> itemPair.first)
            .toList()
            .subscribe(items -> { ... });
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我必须使用它Pair<Item, Integer>来绑定每个项目的位置,然后我可以使用该位置进行排序.

有更简单的方法吗?

rx-java rx-java2

5
推荐指数
1
解决办法
983
查看次数

在Kotlin中将密封类与rxjava一起使用时获取类型不匹配

我有以下代码

sealed class AddressUiState
object AddressLoading : AddressUiState()
class AddressLoadedState(val addressResponse: AddressBookResponse) : AddressUiState()
class AddressErrorState(val error: Throwable) : AddressUiState()
Run Code Online (Sandbox Code Playgroud)

我有如下的ViewModel

class AddressViewModel constructor(private val service: SingleProfileService) : ViewModel() {

    fun getDisplayableAddressState(id: String): Observable<out AddressUiState> {
        return service.getAddresses(id)
                .map { AddressLoadedState(it) }
                .startWith(AddressLoading)
                .onErrorReturn { AddressErrorState(it) }
                .subscribeOn(Schedulers.io())

    }
}
Run Code Online (Sandbox Code Playgroud)

我看到编译错误和错误类型不匹配的 onErrorReturn 必填:AddressLoadedState!找到:AddressErrorState 上面的代码有什么问题?

kotlin rx-java rx-java2

5
推荐指数
1
解决办法
545
查看次数

RxJava使用优化请求

今天我试着解决一个小小的挑战:

您是一家拥有500个办事处的大公司,您想要计算全球收入(每个办事处的收入总和).

每个办公室都提供服务以获得收入.该呼叫需要一定的延迟(网络,数据库访问,......).

显然,您希望尽可能快地获得全球收入.

首先我在python中尝试了非常好的结果:

import asyncio
import time

DELAYS = (475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175)


class Office:

    def __init__(self, delay, name, revenue):
        self.delay = delay
        self.name = name
        self.revenue = revenue

    async def compute(self):
        await asyncio.sleep(self.delay / 1000)
        print(f'{self.name} finished in {self.delay}ms')
        return self.revenue


async def main(offices, totest):
    computed = sum(await asyncio.gather(*[o.compute() for o in offices]))
    verdict = ['nok', 'ok'][computed == totest]
    print(f'Sum of revenues = {computed} …
Run Code Online (Sandbox Code Playgroud)

java rx-java rx-java2

5
推荐指数
1
解决办法
97
查看次数