标签: rx-java

为什么这段代码中没有调用OnComplete?(接收安卓)

我知道当所有项目都发出时,会调用观察者的 OnComplete 。在下面的代码中,我将数据从游标放入 flatMap 运算符中的 ArrayList。我的光标有 100 个条目( c.getCount() 给出 100 ),我的列表大小是 100。 onNext 也被调用 100 次。但 onComplete 没有被调用。我正在 onComplete 中填充列表视图。

static int i = 0;
final List<String> ar = new ArrayList<>();
ListView lv = ...;
ArrayAdapter<String> adapter = ...;   
.
.
. 
q.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
                    .flatMap(new Func1<SqlBrite.Query, Observable<String>>() {
                @Override
                public Observable<String> call(SqlBrite.Query query) {
                    Cursor c = query.run();
                    c.moveToFirst();
                    Log.d("testApp", String.valueOf(c.getCount())); // prints 100
                    do {
                        ar.add(c.getString(0));
                    } while (c.moveToNext());
                    Log.d("testApp", String.valueOf(ar.size())); // prints 100
                    return Observable.from(ar);
                }
            }).subscribe(new Observer<String>() …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android

1
推荐指数
1
解决办法
3536
查看次数

RxJava:将 Rx Flowable 拆分为多个流

我想对流执行一些操作,然后将流分成两个流,然后分别处理它们。

显示问题的示例:

Flowable<SuccessfulObject> stream = Flowable.fromArray(
        new SuccessfulObject(true, 0),
        new SuccessfulObject(false, 1),
        new SuccessfulObject(true, 2));

stream = stream.doOnEach(System.out::println);

Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);

successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();
Run Code Online (Sandbox Code Playgroud)

班级:

class SuccessfulObject {
    private boolean success;
    private int id;

    public SuccessfulObject(boolean success, int id) {
        this.success = success;
        this.id = id;
    }

    public boolean isSuccess() {
        return success;
    }
    public boolean isFail() {
        return !success;
    }

    public void setSuccess(boolean success) {
        this.success = success;
    } …
Run Code Online (Sandbox Code Playgroud)

java stream rx-java

1
推荐指数
1
解决办法
1859
查看次数

Vertx HttpCllentRequest 重定向

我们如何处理 vertx HttpClientRequest 中的重定向(302 响应代码)。是否可以让 vertx 本身处理重定向,或者我们必须显式处理。如果需要明确执行,请解释如何执行。

java vert.x rx-java vertx-httpclient

1
推荐指数
1
解决办法
2172
查看次数

Java 8 流上的缓冲区运算符

我正在尝试编写一个 java 8 流收集器,它反映了rxjava 缓冲区运算符的功能

我有一个工作代码:

// This will gather numbers 1 to 13 and combine them in groups of
// three while preserving the order even if its a parallel stream.
final List<List<String>> triads = IntStream.range(1, 14)
        .parallel()
        .boxed()
        .map(Object::toString)
        .collect(ArrayList::new, accumulator, combiner);

System.out.println(triads.toString())
Run Code Online (Sandbox Code Playgroud)

这里的累加器是这样的:

final BiConsumer<List<List<String>>, String> accumulator = (acc, a) -> {
      StringBuilder stringBuilder = new StringBuilder();
      stringBuilder.append("Accumulator|");
      stringBuilder.append("Before: ").append(acc.toString());
      int accumulatorSize = acc.size();
      if (accumulatorSize == 0) {
        List<String> newList = new ArrayList<>();
        newList.add(a);
        acc.add(newList); …
Run Code Online (Sandbox Code Playgroud)

java functional-programming java-8 rx-java java-stream

1
推荐指数
1
解决办法
3543
查看次数

RXjava onError 在单个上不起作用

我正在尝试使用 rx java 调用 http 调用。

它返回一个 Single 对象。

这是代码:

 service.getEvent(eventId)
    .onErrorResumeNext(exception -> Single.error(exception))
    .doOnError(throwable -> log.error(throwable.getMessage())
    .subscribe(this::handleEvent)
Run Code Online (Sandbox Code Playgroud)

日志打印“doOnError”中预期的日志行,但也打印堆栈跟踪:

 ERROR c.b.w.s.Service:161 - Error: Error, eventId: dummy-event

rx.exceptions.OnError
NotImplementedException: Error
    at rx.functions.Actions$NotImplemented.call(Actions.java:576)
    at rx.functions.Actions$NotImplemented.call(Actions.java:572)
    at rx.Single$11.onError(Single.java:1782)
    at rx.internal.operators.SingleDoOnEvent$SingleDoOnEventSubscriber.onError(SingleDoOnEvent.java:76)
    at rx.Single$1.call(Single.java:460)
    at rx.Single$1.call(Single.java:456)
    at rx.Single.subscribe(Single.java:1967)
    at rx.internal.operators.SingleOperatorOnErrorResumeNext$2.onError(SingleOperatorOnErrorResumeNext.java:69)
    at rx.Single$1.call(Single.java:460)
    at rx.Single$1.call(Single.java:456)
    at rx.Single.subscribe(Single.java:1967)
    at rx.internal.operators.SingleOperatorOnErrorResumeNext.call(SingleOperatorOnErrorResumeNext.java:77)
    at rx.internal.operators.SingleOperatorOnErrorResumeNext.call(SingleOperatorOnErrorResumeNext.java:23)
    at rx.Single.subscribe(Single.java:1967)
    at rx.internal.operators.SingleDoOnEvent.call(SingleDoOnEvent.java:40)
    at rx.internal.operators.SingleDoOnEvent.call(SingleDoOnEvent.java:25)
    at rx.Single.subscribe(Single.java:1967)
    at rx.Single.subscribe(Single.java:1777)
    at rx.Single.subscribe(Single.java:1747)
Run Code Online (Sandbox Code Playgroud)

我怎样才能删除堆栈跟踪...另外,我确实实现了 onError ?

single 是正确的选择还是应该使用 Observable?

问候,伊多

rx-java

1
推荐指数
1
解决办法
3519
查看次数

rxjava 单元测试 onError 未调用

onCompleted被称为,所以很奇怪

方法:

void load() {
    fetchData().subscribeOn(getIoScheduler())
            .observeOn(getMainThreadScheduler())
            .doOnError(throwable -> System.out.println("doOnError " + throwable))
            .subscribe(new Subscriber<MyObject>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onError " + e);
                    handleError();
                }

                @Override
                public void onNext(MyObject myObject) {
                    handleSuccess(myObject);
                }
            });
 }
Run Code Online (Sandbox Code Playgroud)

考试:

@Test
public void load() {
        Mockito.doReturn(Schedulers.immediate())
                .when(presenter)
                .getMainThreadScheduler();
        Mockito.doReturn(Schedulers.immediate())
                .when(presenter)
                .getIoScheduler();

        // a) Success case
MyObject object = Mockito.mock(MyObject.class);
        Mockito.doReturn(Observable.just(object))
                .when(presenter)
                .fetchData();

        presenter.load();

        Mockito.verify(presenter)
                .fetchData();
        Mockito.verify(presenter)
                .handleSuccess(object);
        Mockito.reset(presenter);

        // b) Error case
        Mockito.doReturn(Observable.error(new …
Run Code Online (Sandbox Code Playgroud)

junit android mockito rx-java

1
推荐指数
1
解决办法
3838
查看次数

RxJava - 主要杀死可观察的线程

为什么主线程杀死了我的 rxJava 线程?

public static void main(final String[] args) throws Exception {
    Observable.just(10)
        .subscribeOn(Schedulers.newThread())
        .subscribe(i -> print(i));
    Thread.sleep(100);
}

private static void print(final int i) {
    try {
        Thread.sleep(5000);
    } catch(final InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(i);
}
Run Code Online (Sandbox Code Playgroud)

print方法阻塞线程 5000 毫秒,我认为 JVM 正在等待应用程序下的所有线程被终止。在这种情况下,执行程序后关闭,我在控制台中看Thread.sleep(100)不到。10

注意:如果我将使用自定义执行器,那么Executors.newFixedThreadPool(1);它将等到关闭,但使用Schedulers.newThread()它则不会。

java multithreading rx-java

1
推荐指数
1
解决办法
841
查看次数

RX java io.reactivex.rxjava3.android.schedulers.AndroidSchedulers 无法执行

实现 'io.reactivex.rxjava3:rxandroid:3.0.0' 实现 'io.reactivex.rxjava3:rxjava:3.0.0'

val TAG:String = RXKotlinDemoClass::class.java.simpleName

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)

    var observable = Observable.just("Goat","Dog","Cow")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()).subscribe({
        value -> println(TAG+"$value")
    },{
        error -> println(TAG+"$error")
    },{
        println(TAG+"onComplete")
    }

    )
Run Code Online (Sandbox Code Playgroud)

}

异常:java.lang.NoSuchMethodError:没有静态方法元工厂(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodType;Ljava/lang/invoke /MethodHandle;Ljava/lang/invoke/MethodType;)Ljava/lang/invoke/CallSite; 在类 Ljava/lang/invoke/LambdaMetafactory 中;或其超类(“java.lang.invoke.LambdaMetafactory”的声明出现在 /apex/com.android.runtime/javalib/core-oj.jar 中)位于 io.reactivex.rxjava3.android.schedulers.AndroidSchedulers.(AndroidSchedulers .java:33) 在 io.reactivex.rxjava3.android.schedulers.AndroidSchedulers.mainThread(AndroidSchedulers.java:44) 在 com.android.myfirstapp.RXKotlinDemoClass.onCreate(RXKotlinDemoClass.kt:19) 在 android.app.Activity。在 android.app.Activity.performCreate(Activity.java:7791) 处执行Create(Activity.java:7802) 在 android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1299) 在 android.app.ActivityThread.performLaunchActivity(ActivityThread.java :3245)在android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3409)在android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:83)在android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java: 135) 在 android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:95) 在 android.app.ActivityThread$H.handleMessage(ActivityThread.java:2016) 在 android.os.Handler.dispatchMessage(Handler.java:107) )在 android.os.Looper.loop(Looper.java:214) 在 android.app.ActivityThread.main(ActivityThread.java:7356) 在 java.lang.reflect.Method.invoke(Native Method) 在 com.android。 Internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492) 在 com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)

rx-java rx-android kotlin-android-extensions

1
推荐指数
1
解决办法
5343
查看次数

RxJava:单打列表

我在使用 RxJava 时遇到了一些麻烦。我正在使用 Kotlin 编码。这是我的问题:
我有一份单身人士名单。现在我需要所有 Singles 的发出结果才能继续。如果单打比赛能够并行进行并且结果保持相同的顺序,那就太好了。当所有单身人士都公布了他们的结果时,我想继续。

val list_of_singles = mutableListOf<Single<Type>>()
val results: List<ResultType> = runSingles(list_of_singles)
// use results here...
Run Code Online (Sandbox Code Playgroud)

如果您需要更多信息,请与我们联系。

谢谢!!!:)

kotlin rx-java rx-java2

1
推荐指数
1
解决办法
3078
查看次数

RxJava3。为什么 FlowableSubscriber onNext 没有被调用?

我需要获取订阅对象才能有机会取消订阅侦听器。为此,我想提供一个 FlowableSubscriber 来运行。

代码:

FlowableSubscriber fs = new FlowableSubscriber() {
        @Override
        public void onSubscribe(@NonNull Subscription s) {
            System.out.println("Flowable onSubs");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("Flowable onNext");
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
    };
Run Code Online (Sandbox Code Playgroud)

日志是:

Running...
Flowable onSubs
Run Code Online (Sandbox Code Playgroud)

如果我使用 lambda,它可以工作,但没有 onSubscribe 回调。

我怎样才能获得订阅以及为什么这些方法没有被调用?

java rx-java

1
推荐指数
1
解决办法
466
查看次数