标签: rx-java

将参数传递给Observable.create

我在Android上使用RXJava异步访问数据库.

我想在我的数据库中保存一个对象.通过这种方式,我创建了一个方法,它接受一个最终参数(我想要保存的对象)并返回一个Observable.

在这一点上,我不在乎发射任何东西,所以我最后会打电话subscriber.onComplete().

这是我的代码:

public Observable saveEventLog(@NonNull final EventLog eventLog) {
    return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            DBEventLog log = new DBEventLog(eventLog);
            log.save();
            subscriber.onCompleted();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

问题是,我看到许多人使用参数的final关键字回答,但我想在没有它的情况下这样做.原因是我不喜欢声明一个final变量的方法,以便在另一个线程中使用它.

还有其他选择吗?谢谢.

android reactive-programming rx-java

1
推荐指数
1
解决办法
2342
查看次数

引起:rx.exceptions.MissingBackpressureException

我还有一个问题。这次我Caused by: rx.exceptions.MissingBackpressureException在执行这段代码时遇到了这个错误:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow) …
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java

1
推荐指数
1
解决办法
1871
查看次数

在android中使用AsyncTask和Activity的RxJava

我有一个小时的使用RxJava的经验,我试图在我的项目中实现它,而不是使用接口和监听器.

我有一个异步任务,它在一个单独的模块中调用谷歌云端点方法,并List<Profile>在完成时收到.

onPostExecute()异步任务的方法中,我调用onNext以便任何订阅者都能接收到这些数据.

这是AsyncTask看起来像:

private BirthpayApi mApi;
private String mUserId;
private ReplaySubject<List<Profile>> notifier = ReplaySubject.create();

public GetFriends(String userId) {
    mUserId = userId;
}

public Observable<List<Profile>> asObservable() {
    return notifier;
}

@Override
protected List<Profile> doInBackground(Void... params) {
    if (mApi == null) {
        BirthpayApi.Builder builder = new BirthpayApi.Builder(AndroidHttp.newCompatibleTransport(),
                    new AndroidJsonFactory(), null)
                    // options for running against local devappserver
                    // - 10.0.2.2 is localhost's IP address in Android emulator
                    // - turn off compression when running against …
Run Code Online (Sandbox Code Playgroud)

java android android-asynctask rx-java

1
推荐指数
1
解决办法
764
查看次数

RxJava 合并订阅者仅从第一个 observable 中获取结果

我试图了解 rxjava 合并是如何工作的。所以这里是简单的代码,它应该合并 2 个 observables 的结果并发送给订阅者

    Observable.merge(getObservable(), getTimedObservable())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override public void call(final String s) {
                        Log.i("test", s);
                    }
                });

    private Observable<String> getTimedObservable() {
        return Observable.interval(150, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, String>() {
                    @Override public String call(final Long aLong) {
                        Log.i("test", "tick thread: " + Thread.currentThread().getId());
                        return String.valueOf(aLong);
                    }
                });
    }

    public Observable<String> getObservable() {
        return  Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(final Subscriber<? super String> subscriber) {
                try {
                    Log.i("test", "simple observable thread: " + Thread.currentThread().getId()); …
Run Code Online (Sandbox Code Playgroud)

rx-java

1
推荐指数
1
解决办法
898
查看次数

RX Java - 重试一些引发异常的代码

我正在尝试使用 RX Java 来使用来自不断发送对象的源的一些数据。

我想知道如何为我自己的代码抛出异常的情况实施重试策略。例如,网络异常应该使用指数退避策略触发重试。

一些代码:

 message.map(this::processMessage)
                 .subscribe((message)->{
                     //do something after mapping
                 });
Run Code Online (Sandbox Code Playgroud)

processMessage(message) 是包含可能失败的风险代码的方法,以及我想重试的代码部分,但我不想阻止 observable 消耗来自源的数据。

对此有何想法?

java error-handling observable rx-java exponential-backoff

1
推荐指数
1
解决办法
7043
查看次数

Observable.zip选择不同的调度程序来订阅不同的呼叫

我有两个观测值o1和o2。我通过Observable.zip()函数压缩它们,但是每次都在不同的调度程序上进行订阅。我希望所有o1,o2和z observable都应该在Schedulers.io()上可订阅。但是每次都是Schedulers.io()或Schedulers.computation()是随机的。

这是我的重现问题的源代码

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxZipSchedulers {

    public static void main(String[] args) {

        for(int i=0;i<100;i++) {
            Observable<String> o1 = Observable.just("o1").subscribeOn(Schedulers.computation());
            Observable<String> o2 = Observable.just("o2");

            Observable z = Observable.zip(o1, o2, (s1, s2) -> s1 + " " + s2 + " " + Thread.currentThread());

            z.subscribeOn(Schedulers.io())
                    .subscribe(res -> {
                        System.out.println(res);
                    });

            z.toCompletable().await();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

在我的机器上,输出是这样的(每次都要注意RxComputationScheduler或RxIoScheduler):

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-4,5,main]

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxComputationScheduler-4,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxIoScheduler-3,5,main]

o1 o2线程[RxIoScheduler-2,5,main]

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxIoScheduler-3,5,main] …

java rx-java reactivex

1
推荐指数
1
解决办法
670
查看次数

Android rxjava2 Flowable with Compositedisposable

我对 Flowables 有问题并将它们添加到复合材料中。我想从 Observable 切换到 Flowable,因为该操作可能会发出 1000 个或更多的值。我对 rxjava2 有点缺乏经验,所以如果这个问题很愚蠢,请原谅我:)

到目前为止,我使用了这样的 observable:

 public Observable<String> uploadPictureRx(String path)
    {
        return Observable.create(new ObservableOnSubscribe<String>()
        {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception
            {
                Uri file = Uri.fromFile(new File(path));
                String segment = file.getLastPathSegment();
                UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);


                uploadTask.addOnFailureListener(new OnFailureListener()
                {
                    @Override
                    public void onFailure(@NonNull Exception exception)
                    {
                        e.onError(exception);
                    }
                }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
                    {                           
                        //noinspection VisibleForTests
                        downloadUrl = taskSnapshot.getDownloadUrl();
                        String url = downloadUrl.getPath();
                        e.onNext(url);
                        e.onComplete();
                    }
                }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public …
Run Code Online (Sandbox Code Playgroud)

android observable rx-java

1
推荐指数
2
解决办法
6728
查看次数

android rxjava重复请求

我正在努力实现以下目标。我加载了一个对象列表,我想稍后将值放入列表中。

首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然​​后当一切完成后,我填充一个适配器。

我无法做的就是每隔 xxx 秒重复一次操作。我理解它是使用间隔完成的。我仍然没有得到任何结果,或者只是没有重复一个结果。

这是我的代码:

  Observable.fromIterable(URLList)
            .concatMap(url -> standartRequest(App.getInstance().getApi().getService().getData(currency.getUrl())))
            .retry(Constants.RETRY_COUNT)
            .timeout(Constants.TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::success, this::error, this::valuesRetrieved); 



    recyclerView = ((CurrencyListFragment) controller).getRecyclerView();
    LinearLayoutManager linearManager = new LinearLayoutManager(controller.getContext());
    recyclerView.setLayoutManager(linearManager);
}


private void valuesRetrieved() {
    listAdapter adapter = new listAdapter(valuesFromResponse);
    recyclerView.setAdapter(adapter);
}

private void success(Object response) {
    valuesFromResponse.add(response);
}
Run Code Online (Sandbox Code Playgroud)

我放在哪里

.interval(5, TimeUnit.SECONDS).timeInterval()
Run Code Online (Sandbox Code Playgroud)

android rx-java

1
推荐指数
1
解决办法
1万
查看次数

我可以跟踪RxJava订阅服务器中的事件消耗吗?

我想跟踪订阅者何时开始使用事件以及何时完成.是否有适用于所有Observables/Subscribers的通用方法?

rx-java rx-java2

1
推荐指数
1
解决办法
590
查看次数

onNext()永远不会在PublishSubject上被调用

我正在尝试构建一个演示器,该演示器在某个时间段内计算一些事件,仅在第一次加载时显示加载,并在完成时更新ui。因为可以通过多种方式(例如用户首选项)来更新事件,所以我需要能够告知演示者事件已更新,并且必须再次刷新它们。这是我现在所拥有的:

                      subject
                            .map<List<UpcomingRowViewModel>> {
                                provider.calculateEventsBetween(TimePeriod.aYearFrom(firstDay))
                            }
                            .doOnSubscribe {
                                view.showLoading()
                            }
                            .observeOn(resultScheduler)
                            .subscribeOn(workScheduler)
                            .subscribe { upcomingRowViewModels ->
                                view.display(upcomingRowViewModels)
                            }
                      subject.onNext(TRIGGER)
Run Code Online (Sandbox Code Playgroud)

主题是PublishSubjectInt。onNext()订阅后我会执行正确的操作,因为我希望在订阅数据后立即刷新数据。

上面的代码在我的单元测试中以及在仅在附有调试器的设备上运行时才起作用。如果我只是运行它(不使用任何调试器),它将到达该view.showLoading()部分,但是永远不会到达,provider.calculateEventsBetween(TimePeriod.aYearFrom(firstDay)因此UI会因加载而“卡住”。

有任何想法吗?

rx-java rx-kotlin publishsubject

1
推荐指数
1
解决办法
1810
查看次数