我正在尝试创建一个Observable这样的,它将在一个时间间隔内从网络加载一些数据,并在用户刷新页面时.这是我到目前为止的要点:
PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});
Run Code Online (Sandbox Code Playgroud)
后来在刷新回调中我有:
refreshSubject.onNext(0L);
Run Code Online (Sandbox Code Playgroud)
它在间隔很好的情况下运行,但是当我刷新时,它会爆炸NetworkOnMainThreadException.我以为我用subscribeOn/ 处理了这个observeOn.我错过了什么?另外,为什么当Observer从间隔触发时这不会导致崩溃?
我正在运行RxJava并创建一个使用onNext()方法来生成数据的主题.我正在使用Spring.
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
Run Code Online (Sandbox Code Playgroud)
在RxJava流上生成新数据的方式是通过@Autowire private SubjectObserver subjectObserver
然后调用subjectObserver.publish(newDataObjGenerated)
无论我为subscribeOn()&指定什么observeOn():
其中onNext()的实际工作是在实际调用onNext()主题以生成/生成数据的同一线程上完成的.
它是否正确?如果是这样,我错过了什么?我期待在doSomething()不同的线程上完成.
更新
在我的调用类中,如果我改变了调用publish方法的方式,那么当然会为订阅者分配一个新线程来运行.
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj)); …Run Code Online (Sandbox Code Playgroud) 我无法让Observer.onErrorResumeNext按照我的预期行事.
Subscriber orchestratorObserver = new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("orchestratorObserver."+"onCompleted()");
}
@Override
public void onError(Throwable e) {
System.out.println("orchestratorObserver."+"onError()"+e.getMessage());
}
@Override
public void onNext(Integer i) {
System.out.println("orchestratorObserver."+"onNext() : "+i);
}
};
@Test
public void rxTest() {
Observable.range(0,5)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException("chain error!!");
}
})
.onErrorResumeNext(Observable.just(-1))
.subscribe(orchestratorObserver);
}
@Test
public void rxTest1() {
final Observable<Integer> errorTrainObservable = Observable.defer(
new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.error(new RuntimeException("source error"));
} …Run Code Online (Sandbox Code Playgroud) 为什么RxJava 1.x flatMap()运算符是通过merge实现的?
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
Run Code Online (Sandbox Code Playgroud)
从flatMap()调用中,我只能返回一个符合的Observable <? extends Observable<? extends R>>。比map(func)调用将它包装到另一个Observable中,这样我们就得到了这样的东西Observable<? extends Observable<? extends R>>。这使我认为map(func)之后的merge()调用是不必要的。
据说merge()运算符执行以下操作:
将可发射Observable的Observable展平为单个Observable,该Observable发射那些Observable发射的项目,而无需进行任何转换。
现在,在平面地图内,我们只能有一个Observable发出一个Observable。为什么要合并?我在这里想念什么?
谢谢。
我有一个包含一组对象的arraylist。我想使用rxjava,这样我就可以通过onSubscribe方法循环遍历列表,而不是一次获取整个列表,而是一次获取每个列表项
如何在RxJava2中链接要批量处理的流程。下面的流程图是我想要实现的。
Flowable#1 Flowable#2 (process every 10)
============== ================================
callServer(p1) ->
: -> saveToDatabase(List<r1 to r10>)
callServer(p20)-> saveToDatabase(List<r11 to r20>)
callServer(p21)-> :
: :
callServer(p35)-> saveToDatabase(List<r31 to r35>) //the remainder
Run Code Online (Sandbox Code Playgroud)
当前,我要等待所有结果返回,然后再保存到数据库中。
Flowable.fromIterable(paramList)
.map(p -> callServer(p))
//wait for the return a map of ALL the results r
//how to chain it such that saveToDatabase process after 'n' results
.toList()
.flatmap(listOfR -> saveToDatabase(listOfR);
Run Code Online (Sandbox Code Playgroud)
我如何做到在每个“ n”个结果之后调用saveToDatabase而不是等待所有结果完成?
我有一个情况,一个长时间运行的进程被包装在一个Observable.fromCallable().这个过程是一个OkHttp调用,如果终止,将抛出一个IOException.如果订阅了observable,则将一次性存储在a中CompositeDisposable,并按预期处理异常.但是,CompositeDisposable在某些情况下,我的代码将清除,在OkHttp没有错误处理的情况下触发线程终止,导致应用程序因未处理的异常而崩溃.这是这个问题的简单单元测试示例:
@Test
public void test(){
CompositeDisposable compositeDisposable = new CompositeDisposable();
Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
Thread.sleep(3000);
return null;
}
});
compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
compositeDisposable.clear();
}
Run Code Online (Sandbox Code Playgroud)
有没有办法解决这个问题?
我创建了一个简单IntentService的文件和一些数据上传到服务器。我希望能够Toast在上传完成时显示,但是我需要在主线程上才能执行此操作。
由于我将RetroFit与RxJava结合使用来处理实际请求,因此我认为我应该使用该observeOn(AndroidSchedulers.mainThread())方法Toast在主线程上创建。问题是(由于服务器的原因)我可能不得不重新发送请求,在这种情况下,我必须postRequest()再次调用该方法。
然后,此新请求现在位于主线程上。因此,为了避免使用该subscribeOn(Schedulers.io())方法,考虑到Service已经在自己的线程上,这似乎是一种浪费。
是否有指定的方式Observable应该subscribeOn()的Service线程?还是应该仅继承子类Service而不IntentService使用io线程?
private void postRequest(String title, LatLng location,
String description, MultipartBody.Part attachment) {
mNetworkService.postRequest(title, location.latitude, location.longitude,
description, attachment).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(response -> {
if (response.status() == 1) {
ToastUtil.makeText(getApplicationContext(), "Request Sent");
stopSelf();
} else {
postRequest(title, location, description, attachment);
}
});
}
Run Code Online (Sandbox Code Playgroud) 我有一个使用RxJava 2的简单应用程序:
public static void main(final String[] args) {
final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
final Observable<String> ticker = Observable.interval(1L, TimeUnit.SECONDS)
.take(10)
.subscribeOn(scheduler)
.map(x -> x + "s");
ticker.subscribe(x -> {
System.out.println(x);
});
}
Run Code Online (Sandbox Code Playgroud)
它正确打印定时器10次:
0s
1s
2s
3s
4s
5s
6s
7s
8s
9s
Run Code Online (Sandbox Code Playgroud)
但是,应用程序之后不会终止9s.似乎有一些线程让它保持活力.
我应该如何实现这一点,以便应用程序在ticker完成后终止?
我想在用户点击按钮时调用请求.但是当第一次请求没有响应时,用户想要更改参数并使用相同的URL调用新请求.
问题 如何取消第一个请求并使用相同的URL调用新请求.
ServiceFactory
.createService()
.getSomething(page)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DefaultObserver<SomthingResponse>() {
@Override
public void onNext(@NonNull SomthingResponse response) {
showSomthing()
hideProgressDialog();
}
@Override
public void onError(@NonNull Throwable e) {
hideProgressDialog();
showErrorMessage(e.getMessage());
}
@Override
public void onComplete() {
}
});
Run Code Online (Sandbox Code Playgroud)
当用户点击按钮时,用户会将页面参数发送到查询字符串,但此请求尚未完成.用户将更改为页面参数并发送新请求.我想取消第一个请求.
谢谢