标签: rx-java

RxAndroid Observable在意外的线程上运行

我正在尝试创建一个Observable这样的,它将在一个时间间隔内从网络加载一些数据,并在用户刷新页面时.这是我到目前为止的要点:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.flatMap(t -> {
    // network operations that eventually return a value
    // these operations are not observables themselves
    // they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // update ui with data
}, error -> {
    // do something with error
});
Run Code Online (Sandbox Code Playgroud)

后来在刷新回调中我有:

refreshSubject.onNext(0L);
Run Code Online (Sandbox Code Playgroud)

它在间隔很好的情况下运行,但是当我刷新时,它会爆炸NetworkOnMainThreadException.我以为我用subscribeOn/ 处理了这个observeOn.我错过了什么?另外,为什么当Observer从间隔触发时这不会导致崩溃?

android reactive-programming rx-java rx-android

0
推荐指数
1
解决办法
112
查看次数

在不同的线程rxJava上运行PublishSubject

我正在运行RxJava并创建一个使用onNext()方法来生成数据的主题.我正在使用Spring.

这是我的设置:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}
Run Code Online (Sandbox Code Playgroud)

在RxJava流上生成新数据的方式是通过@Autowire private SubjectObserver subjectObserver 然后调用subjectObserver.publish(newDataObjGenerated)

无论我为subscribeOn()&指定什么observeOn():

  • Schedulers.io()
  • Schedulers.computation()
  • 我的主题
  • Schedulers.newThread

其中onNext()的实际工作是在实际调用onNext()主题以生成/生成数据的同一线程上完成的.

它是否正确?如果是这样,我错过了什么?我期待在doSomething()不同的线程上完成.

更新

在我的调用类中,如果我改变了调用publish方法的方式,那么当然会为订阅者分配一个新线程来运行.

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj)); …
Run Code Online (Sandbox Code Playgroud)

spring subject-observer rx-java

0
推荐指数
1
解决办法
1991
查看次数

onErrorResumeNext的rxjava行为

我无法让Observer.onErrorResumeNext按照我的预期行事.

Subscriber orchestratorObserver = new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("orchestratorObserver."+"onCompleted()");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("orchestratorObserver."+"onError()"+e.getMessage());
    }

    @Override
    public void onNext(Integer i) {
        System.out.println("orchestratorObserver."+"onNext() : "+i);
    }
};

@Test
public void rxTest() {
    Observable.range(0,5)
            .doOnNext(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    throw new RuntimeException("chain error!!");
                }
            })
            .onErrorResumeNext(Observable.just(-1))
            .subscribe(orchestratorObserver);
}

@Test
public void rxTest1() {

    final Observable<Integer> errorTrainObservable = Observable.defer(
            new Func0<Observable<Integer>>() {
                @Override
                public Observable<Integer> call() {
                    return Observable.error(new RuntimeException("source error"));
                } …
Run Code Online (Sandbox Code Playgroud)

android system.reactive rx-java

0
推荐指数
1
解决办法
884
查看次数

为什么在RxJava中通过merge实现flatMap?

为什么RxJava 1.x flatMap()运算符是通过merge实现的?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }
Run Code Online (Sandbox Code Playgroud)

从flatMap()调用中,我只能返回一个符合的Observable <? extends Observable<? extends R>>。比map(func)调用将它包装到另一个Observable中,这样我们就得到了这样的东西Observable<? extends Observable<? extends R>>。这使我认为map(func)之后的merge()调用是不必要的。

据说merge()运算符执行以下操作:

将可发射Observable的Observable展平为单个Observable,该Observable发射那些Observable发射的项目,而无需进行任何转换。

现在,在平面地图内,我们只能有一个Observable发出一个Observable。为什么要合并?我在这里想念什么?

谢谢。

rx-java flatmap rx-android

0
推荐指数
1
解决办法
1324
查看次数

使用rxjava2遍历arraylist

我有一个包含一组对象的arraylist。我想使用rxjava,这样我就可以通过onSubscribe方法循环遍历列表,而不是一次获取整个列表,而是一次获取每个列表项

java android arraylist rx-java rx-java2

0
推荐指数
1
解决办法
1425
查看次数

如何在RxJava2的链中批量处理Flowable输出

如何在RxJava2中链接要批量处理的流程。下面的流程图是我想要实现的。

Flowable#1          Flowable#2 (process every 10)
==============     ================================
callServer(p1) ->
      :        ->  saveToDatabase(List<r1 to r10>)
callServer(p20)->  saveToDatabase(List<r11 to r20>)
callServer(p21)->      :
      :                :
callServer(p35)->  saveToDatabase(List<r31 to r35>) //the remainder
Run Code Online (Sandbox Code Playgroud)

当前,我要等待所有结果返回,然后再保存到数据库中。

Flowable.fromIterable(paramList)
    .map(p -> callServer(p)) 
    //wait for the return a map of ALL the results r  
    //how to chain it such that saveToDatabase process after 'n' results 
    .toList()  
    .flatmap(listOfR -> saveToDatabase(listOfR); 
Run Code Online (Sandbox Code Playgroud)

我如何做到在每个“ n”个结果之后调用saveToDatabase而不是等待所有结果完成?

rx-java rx-java2

0
推荐指数
1
解决办法
854
查看次数

订阅被清除时,处理Observable.fromCallable()中的异常

我有一个情况,一个长时间运行的进程被包装在一个Observable.fromCallable().这个过程是一个OkHttp调用,如果终止,将抛出一个IOException.如果订阅了observable,则将一次性存储在a中CompositeDisposable,并按预期处理异常.但是,CompositeDisposable在某些情况下,我的代码将清除,在OkHttp没有错误处理的情况下触发线程终止,导致应用程序因未处理的异常而崩溃.这是这个问题的简单单元测试示例:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}
Run Code Online (Sandbox Code Playgroud)

有没有办法解决这个问题?

android rx-java rx-java2

0
推荐指数
1
解决办法
1507
查看次数

RxJava在服务线程上订阅

我创建了一个简单IntentService的文件和一些数据上传到服务器。我希望能够Toast在上传完成时显示,但是我需要在主线程上才能执行此操作。

由于我将RetroFit与RxJava结合使用来处理实际请求,因此我认为我应该使用该observeOn(AndroidSchedulers.mainThread())方法Toast在主线程上创建。问题是(由于服务器的原因)我可能不得不重新发送请求,在这种情况下,我必须postRequest()再次调用该方法。

然后,此新请求现在位于主线程上。因此,为了避免使用该subscribeOn(Schedulers.io())方法,考虑到Service已经在自己的线程上,这似乎是一种浪费。

是否有指定的方式Observable应该subscribeOn()Service线程?还是应该仅继承子类Service而不IntentService使用io线程?

private void postRequest(String title, LatLng location,
                         String description, MultipartBody.Part attachment) {
    mNetworkService.postRequest(title, location.latitude, location.longitude,
            description, attachment).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(response -> {
                if (response.status() == 1) {
                    ToastUtil.makeText(getApplicationContext(), "Request Sent");
                    stopSelf();
                } else {
                    postRequest(title, location, description, attachment);
                }
            });
}
Run Code Online (Sandbox Code Playgroud)

android android-service rx-java rx-android

0
推荐指数
1
解决办法
1118
查看次数

为什么我的RxJava计时器应用程序没有终止?

我有一个使用RxJava 2的简单应用程序:

public static void main(final String[] args) {

    final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());

    final Observable<String> ticker = Observable.interval(1L, TimeUnit.SECONDS)
        .take(10)
        .subscribeOn(scheduler)
        .map(x -> x + "s");

    ticker.subscribe(x -> {
        System.out.println(x);
    });
}
Run Code Online (Sandbox Code Playgroud)

它正确打印定时器10次:

0s
1s
2s
3s
4s
5s
6s
7s
8s
9s
Run Code Online (Sandbox Code Playgroud)

但是,应用程序之后不会终止9s.似乎有一些线程让它保持活力.

我应该如何实现这一点,以便应用程序在ticker完成后终止?

java multithreading rx-java rx-java2

0
推荐指数
1
解决办法
168
查看次数

当我请求相同的url时,如何在retrofix2.0(rxandroid)中取消请求

我想在用户点击按钮时调用请求.但是当第一次请求没有响应时,用户想要更改参数并使用相同的URL调用新请求.

问题 如何取消第一个请求并使用相同的URL调用新请求.

 ServiceFactory
            .createService()
            .getSomething(page)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DefaultObserver<SomthingResponse>() {
                @Override
                public void onNext(@NonNull SomthingResponse response) {
                    showSomthing()
                    hideProgressDialog();

                }

                @Override
                public void onError(@NonNull Throwable e) {
                    hideProgressDialog();
                    showErrorMessage(e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });
Run Code Online (Sandbox Code Playgroud)

当用户点击按钮时,用户会将页面参数发送到查询字符串,但此请求尚未完成.用户将更改为页面参数并发送新请求.我想取消第一个请求.

谢谢

android rx-java rx-android retrofit2 rx-java2

0
推荐指数
1
解决办法
403
查看次数