我知道当所有项目都发出时,会调用观察者的 OnComplete 。在下面的代码中,我将数据从游标放入 flatMap 运算符中的 ArrayList。我的光标有 100 个条目( c.getCount() 给出 100 ),我的列表大小是 100。 onNext 也被调用 100 次。但 onComplete 没有被调用。我正在 onComplete 中填充列表视图。
static int i = 0;
final List<String> ar = new ArrayList<>();
ListView lv = ...;
ArrayAdapter<String> adapter = ...;
.
.
.
q.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<SqlBrite.Query, Observable<String>>() {
@Override
public Observable<String> call(SqlBrite.Query query) {
Cursor c = query.run();
c.moveToFirst();
Log.d("testApp", String.valueOf(c.getCount())); // prints 100
do {
ar.add(c.getString(0));
} while (c.moveToNext());
Log.d("testApp", String.valueOf(ar.size())); // prints 100
return Observable.from(ar);
}
}).subscribe(new Observer<String>() …Run Code Online (Sandbox Code Playgroud) 我想对流执行一些操作,然后将流分成两个流,然后分别处理它们。
显示问题的示例:
Flowable<SuccessfulObject> stream = Flowable.fromArray(
new SuccessfulObject(true, 0),
new SuccessfulObject(false, 1),
new SuccessfulObject(true, 2));
stream = stream.doOnEach(System.out::println);
Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);
successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();
Run Code Online (Sandbox Code Playgroud)
班级:
class SuccessfulObject {
private boolean success;
private int id;
public SuccessfulObject(boolean success, int id) {
this.success = success;
this.id = id;
}
public boolean isSuccess() {
return success;
}
public boolean isFail() {
return !success;
}
public void setSuccess(boolean success) {
this.success = success;
} …Run Code Online (Sandbox Code Playgroud) 我们如何处理 vertx HttpClientRequest 中的重定向(302 响应代码)。是否可以让 vertx 本身处理重定向,或者我们必须显式处理。如果需要明确执行,请解释如何执行。
我正在尝试编写一个 java 8 流收集器,它反映了rxjava 缓冲区运算符的功能
我有一个工作代码:
// This will gather numbers 1 to 13 and combine them in groups of
// three while preserving the order even if its a parallel stream.
final List<List<String>> triads = IntStream.range(1, 14)
.parallel()
.boxed()
.map(Object::toString)
.collect(ArrayList::new, accumulator, combiner);
System.out.println(triads.toString())
Run Code Online (Sandbox Code Playgroud)
这里的累加器是这样的:
final BiConsumer<List<List<String>>, String> accumulator = (acc, a) -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Accumulator|");
stringBuilder.append("Before: ").append(acc.toString());
int accumulatorSize = acc.size();
if (accumulatorSize == 0) {
List<String> newList = new ArrayList<>();
newList.add(a);
acc.add(newList); …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 rx java 调用 http 调用。
它返回一个 Single 对象。
这是代码:
service.getEvent(eventId)
.onErrorResumeNext(exception -> Single.error(exception))
.doOnError(throwable -> log.error(throwable.getMessage())
.subscribe(this::handleEvent)
Run Code Online (Sandbox Code Playgroud)
日志打印“doOnError”中预期的日志行,但也打印堆栈跟踪:
ERROR c.b.w.s.Service:161 - Error: Error, eventId: dummy-event
rx.exceptions.OnError
NotImplementedException: Error
at rx.functions.Actions$NotImplemented.call(Actions.java:576)
at rx.functions.Actions$NotImplemented.call(Actions.java:572)
at rx.Single$11.onError(Single.java:1782)
at rx.internal.operators.SingleDoOnEvent$SingleDoOnEventSubscriber.onError(SingleDoOnEvent.java:76)
at rx.Single$1.call(Single.java:460)
at rx.Single$1.call(Single.java:456)
at rx.Single.subscribe(Single.java:1967)
at rx.internal.operators.SingleOperatorOnErrorResumeNext$2.onError(SingleOperatorOnErrorResumeNext.java:69)
at rx.Single$1.call(Single.java:460)
at rx.Single$1.call(Single.java:456)
at rx.Single.subscribe(Single.java:1967)
at rx.internal.operators.SingleOperatorOnErrorResumeNext.call(SingleOperatorOnErrorResumeNext.java:77)
at rx.internal.operators.SingleOperatorOnErrorResumeNext.call(SingleOperatorOnErrorResumeNext.java:23)
at rx.Single.subscribe(Single.java:1967)
at rx.internal.operators.SingleDoOnEvent.call(SingleDoOnEvent.java:40)
at rx.internal.operators.SingleDoOnEvent.call(SingleDoOnEvent.java:25)
at rx.Single.subscribe(Single.java:1967)
at rx.Single.subscribe(Single.java:1777)
at rx.Single.subscribe(Single.java:1747)
Run Code Online (Sandbox Code Playgroud)
我怎样才能删除堆栈跟踪...另外,我确实实现了 onError ?
single 是正确的选择还是应该使用 Observable?
问候,伊多
onCompleted被称为,所以很奇怪
方法:
void load() {
fetchData().subscribeOn(getIoScheduler())
.observeOn(getMainThreadScheduler())
.doOnError(throwable -> System.out.println("doOnError " + throwable))
.subscribe(new Subscriber<MyObject>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError " + e);
handleError();
}
@Override
public void onNext(MyObject myObject) {
handleSuccess(myObject);
}
});
}
Run Code Online (Sandbox Code Playgroud)
考试:
@Test
public void load() {
Mockito.doReturn(Schedulers.immediate())
.when(presenter)
.getMainThreadScheduler();
Mockito.doReturn(Schedulers.immediate())
.when(presenter)
.getIoScheduler();
// a) Success case
MyObject object = Mockito.mock(MyObject.class);
Mockito.doReturn(Observable.just(object))
.when(presenter)
.fetchData();
presenter.load();
Mockito.verify(presenter)
.fetchData();
Mockito.verify(presenter)
.handleSuccess(object);
Mockito.reset(presenter);
// b) Error case
Mockito.doReturn(Observable.error(new …Run Code Online (Sandbox Code Playgroud) 为什么主线程杀死了我的 rxJava 线程?
public static void main(final String[] args) throws Exception {
Observable.just(10)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> print(i));
Thread.sleep(100);
}
private static void print(final int i) {
try {
Thread.sleep(5000);
} catch(final InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
Run Code Online (Sandbox Code Playgroud)
print方法阻塞线程 5000 毫秒,我认为 JVM 正在等待应用程序下的所有线程被终止。在这种情况下,执行程序后关闭,我在控制台中看Thread.sleep(100)不到。10
注意:如果我将使用自定义执行器,那么Executors.newFixedThreadPool(1);它将等到关闭,但使用Schedulers.newThread()它则不会。
实现 'io.reactivex.rxjava3:rxandroid:3.0.0' 实现 'io.reactivex.rxjava3:rxjava:3.0.0'
val TAG:String = RXKotlinDemoClass::class.java.simpleName
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
var observable = Observable.just("Goat","Dog","Cow")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe({
value -> println(TAG+"$value")
},{
error -> println(TAG+"$error")
},{
println(TAG+"onComplete")
}
)
Run Code Online (Sandbox Code Playgroud)
}
异常:java.lang.NoSuchMethodError:没有静态方法元工厂(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodType;Ljava/lang/invoke /MethodHandle;Ljava/lang/invoke/MethodType;)Ljava/lang/invoke/CallSite; 在类 Ljava/lang/invoke/LambdaMetafactory 中;或其超类(“java.lang.invoke.LambdaMetafactory”的声明出现在 /apex/com.android.runtime/javalib/core-oj.jar 中)位于 io.reactivex.rxjava3.android.schedulers.AndroidSchedulers.(AndroidSchedulers .java:33) 在 io.reactivex.rxjava3.android.schedulers.AndroidSchedulers.mainThread(AndroidSchedulers.java:44) 在 com.android.myfirstapp.RXKotlinDemoClass.onCreate(RXKotlinDemoClass.kt:19) 在 android.app.Activity。在 android.app.Activity.performCreate(Activity.java:7791) 处执行Create(Activity.java:7802) 在 android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1299) 在 android.app.ActivityThread.performLaunchActivity(ActivityThread.java :3245)在android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3409)在android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:83)在android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java: 135) 在 android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:95) 在 android.app.ActivityThread$H.handleMessage(ActivityThread.java:2016) 在 android.os.Handler.dispatchMessage(Handler.java:107) )在 android.os.Looper.loop(Looper.java:214) 在 android.app.ActivityThread.main(ActivityThread.java:7356) 在 java.lang.reflect.Method.invoke(Native Method) 在 com.android。 Internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492) 在 com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
我在使用 RxJava 时遇到了一些麻烦。我正在使用 Kotlin 编码。这是我的问题:
我有一份单身人士名单。现在我需要所有 Singles 的发出结果才能继续。如果单打比赛能够并行进行并且结果保持相同的顺序,那就太好了。当所有单身人士都公布了他们的结果时,我想继续。
val list_of_singles = mutableListOf<Single<Type>>()
val results: List<ResultType> = runSingles(list_of_singles)
// use results here...
Run Code Online (Sandbox Code Playgroud)
如果您需要更多信息,请与我们联系。
谢谢!!!:)
我需要获取订阅对象才能有机会取消订阅侦听器。为此,我想提供一个 FlowableSubscriber 来运行。
代码:
FlowableSubscriber fs = new FlowableSubscriber() {
@Override
public void onSubscribe(@NonNull Subscription s) {
System.out.println("Flowable onSubs");
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
Run Code Online (Sandbox Code Playgroud)
日志是:
Running...
Flowable onSubs
Run Code Online (Sandbox Code Playgroud)
如果我使用 lambda,它可以工作,但没有 onSubscribe 回调。
我怎样才能获得订阅以及为什么这些方法没有被调用?