标签: rx-java

如何在订阅开始时显示对话框?

我想在执行 Observable 时显示 ProgressBar:

Observable<T> observable;
Observer<T> observer;
...................
observable.doOnSubscribe(()->{showProgressBar();}
          .finallyDo(()-> {hideProgressBar();})
          .subscribeOn(Schedulers.newThread())
          .observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

...................
    protected void showProgressBar() {
        if (mProgressBar != null)
            mProgressBar.setVisibility(View.VISIBLE);
    }
    protected void hideProgressBar() {
        if (mProgressBar != null)
            mProgressBar.setVisibility(View.GONE);
    }
Run Code Online (Sandbox Code Playgroud)

;

但我收到此错误:

 android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Run Code Online (Sandbox Code Playgroud)

在线的 mProgressBar.setVisibility(View.VISIBLE);

如何运行showProgressBar()doOnSubscribe()

java android rx-java rx-android

0
推荐指数
1
解决办法
681
查看次数

将表达式转换为 lambda

任何人都可以帮助我将这段代码转换为 lambda 表达式,我对此感到震惊

Observable.create(new OnSubscribe<User>() {
        @Override
        public void call(Subscriber<? super User> arg0) {
            User updatedUser = userService.updateuser(usermapper.userdtotoentity(user));
            arg0.onNext(updatedUser);
        }
    }).subscribe(new Action1<User>() {
        @Override
        public void call(User user) {
            if (user != null) {
                response.resume(user);
            } else
                response.resume(Response.status(Status.NOT_FOUND).build());
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable t) {
            logger.debug("User with email_id:" + email_id + " is not present");
            response.resume(t);
        }
    });
Run Code Online (Sandbox Code Playgroud)

lambda throwable subscribe observable rx-java

0
推荐指数
1
解决办法
3381
查看次数

Observable 不是异步的

我正在学习RxJava并正在测试从数据库读取数据然后将其发布到队列的场景。我只是对整个过程做了一个样本模拟,但我似乎没有找到Observable我想要的工作,即。异步。

这是我的代码:

package rxJava;

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.Observer;
import rx.functions.Action1;

public class TestClass {

    public static void main(String[] args) {

        TestClass test = new TestClass();
        System.out.println("---START---");

        test.getFromDB().subscribe(new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println("Publish complete.");
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onNext(String s) {
                test.publishToQueue(s).subscribe(new Observer<Boolean>() {

                    @Override
                    public void onNext(Boolean b) {
                        if (b) {
                            System.out.println("Successfully published.");
                        }
                    }

                    @Override
                    public void onCompleted() { …
Run Code Online (Sandbox Code Playgroud)

java multithreading asynchronous rx-java

0
推荐指数
1
解决办法
2857
查看次数

递归地将Rx单曲组合成Observable

假设我有一个可以从类型中发出元素或者失败的Single调用(在某些语言中会是这样).那是:s_0t_0TSingle<T>

s_0: -- t_0          // Success

OR

s_0: -- X            // Failure
Run Code Online (Sandbox Code Playgroud)

从类型实例T有一个next()返回一个可选的方法Single从式T以及(一个Single<T>?科特林).这种行为导致一系列Single实例能够发出一系列T实例,其中每个实例都s_i可以发出一个t_i能够返回下s_i+1一个单元的元素t_i+1,该元素将发出一个元素,依此类推,直到最后一个元素t_n-1不返回单个或任何一个元素为止单打失败:

s_0: -- t_0
        ?
        s_1: -- t_1
                ?
                s_2: -- t_2

                        ...

                        ?
                        s_n-1: -- t_n-1
                                  ?
                                  null

OR

s_0: -- t_0
        ?
        s_1: -- t_1
                ?
                s_2: -- t_2

                        ...

                        ?
                        s_i: -- X …
Run Code Online (Sandbox Code Playgroud)

reactive-programming kotlin rx-java

0
推荐指数
1
解决办法
897
查看次数

列表中的每个项目的RxJava在Android中使用Retofit发出api请求

我的Android项目中存在以下问题:

我有一个使用Retrofit从上一个网络调用获得的元素列表(ArrayList)。CategoryItem看起来像这样:

public class CategoryItem {
   String id;
   String name;
   public CategoryItem(String id, String name) {
     this.id = id;
     this.name = name;
   }
   public String getId() {
     return id;
   }
   public String getName() {
     return name;
   }
}
Run Code Online (Sandbox Code Playgroud)

实际上,类别由20个元素组成。现在,我需要制作一个网络API,并获取每个类别的产品列表。为此,产品API将类别中的ID作为查询参数。在获得某种类别的产品列表之后,我将其添加到内部SQLite DB中。

我想使用RxJava(1和2)做到这一点。

我到目前为止所做的不是多线程的,而是在MainThread上的,这是不正确的。我将在此处添加代码段:

for (int i = 0; i < categoryItems.size(); i++) {
    CategoryItem categoryItem = categoryItems.get(i);
    final String iCat = categoryItem.getId();
    Observable<ProductResponse> call = networkManager.getApiServiceProductsRx().getProductsRx(iCat);

    Subscription subscription = call
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<ProductResponse>() {
                @Override
                public void onCompleted() {
                } …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-java2

0
推荐指数
1
解决办法
4253
查看次数

RxJava订阅不起作用

我有以下代码 -

package com.test.rxjava;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;

public class App1 {

    public static void main(String[] args) {

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
            }

            @Override
            public void onNext(Integer t) {
                System.out.printf("Entry %d\n", t);
            }

            @Override
            public void onError(Throwable t) {
                System.err.printf("Failed to process: %s\n", t);
            }

            @Override
            public void onComplete() {
                System.out.println("Done");
            }

        };
        Flowable.just(123).subscribe(subscriber);

    }

}
Run Code Online (Sandbox Code Playgroud)

我期待在onNext方法中执行代码.但是没有任何反应.但是如果我用下面的代码替换最后一行,我确实得到了输出.

Flowable.just(123).subscribe((t) -> System.out.println(t));
Run Code Online (Sandbox Code Playgroud)

我不确定这里缺少什么.但绝对是肯定的.我是Rx世界的新手,可以帮助找出问题所在.提前致谢!

java reactive-programming rx-java

0
推荐指数
1
解决办法
1857
查看次数

使用rxJava超时实用程序时未获取TimeoutException

如果5秒钟内未输入任何文本,则不会收到TimeoutException。下面的代码方法将调用getMsg()并等待文本输入。我添加了“ timeout(5,TimeUnit.SECONDS)”,仅等待5秒钟进行输入。如果用户在5秒钟内未输入msg,我想显示超时错误。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;

import rx.Observable;

public class TestRx {

   public static void main( String[] args ) throws IOException {
      Observable.just( getMsg() )
            .timeout( 5, TimeUnit.SECONDS )
            .subscribe( System.out::println,
                  e -> e.printStackTrace() );
      System.out.println( "End..." );
   }

   private static String getMsg() throws IOException {
      BufferedReader reader = new BufferedReader( new InputStreamReader( System.in ) );
      System.out.print( "Enter a msg:" );
      String msg = reader.readLine();
      return msg;
   }
}
Run Code Online (Sandbox Code Playgroud)

rx-java

0
推荐指数
1
解决办法
442
查看次数

为什么blockingSingle崩溃但blockingFirst有效

我只是从应用程序的共享首选项中读取一个值,因此应该只发出1个值。

SharedPrefAsync:

Observable<Boolean> getSomeValue() {
  // retrieve the value asycnhronously
}
Run Code Online (Sandbox Code Playgroud)

用法:
sharedPrefsAsync.getSomeValue().blockingFirst()可行,但sharedPrefsAsync.getSomeValue().blockingSingle()似乎使应用程序崩溃,没有错误日志

我已经阅读了官方文档,但不清楚blockingSingle和之间的区别blockingFirst

如何找出blockingSingle此处使用的实际问题?

android rx-java rx-java2

0
推荐指数
1
解决办法
612
查看次数

使用Kotlin和rx-java combineLatest与表单验证混淆

所以我想用rx-java2进行表单验证.我正在使用Kotlin.我遇到了两个问题.emailObservable和passwordObservable都是类型Disposable!.我尝试通过调用指定类型,val emailObservable: Observable<Boolean>但Android Studio认为它Disposable!.

其次,当我想使用方法时combineLatest出现错误:使用提供的参数不能调用以下任何函数.

emailObservable和passwordObservable都能正常工作.我是rx-java的新手,我对这种类型的东西很困惑.

val emailObservable = RxTextView.afterTextChangeEvents(textEmail)
                .observeOn(AndroidSchedulers.mainThread())
                .map { x -> textEmail.text.length > 3 }
                .subscribe { x -> foo(x) }

val passwordObservable =RxTextView.afterTextChangeEvents(textPassword)
                .observeOn(AndroidSchedulers.mainThread())
                .map { x -> textPassword.text.length > 5 }
                .subscribe { x -> foo(x) }


Observable.combineLatest(emailObservable,
passwordObservable,
BiFunction { x: Boolean, y:Boolean -> x && y })
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java rx-android rx-binding rx-java2

0
推荐指数
1
解决办法
402
查看次数

How to get index / position of list in onnext from Observable.fromIterable Rxjava?

I'm trying to loop a list but i'm not getting the index of current item.

Observable.fromIterable(yourList).observeOn(Schedulers.io())
            .observeOn(Schedulers.io()).subscribe(
                { item -> {        }},
                {_ ->{}},
                {->{}}
Run Code Online (Sandbox Code Playgroud)

Is there any way to get index just like

yourList.forEachIndexed{ index, item -> }
Run Code Online (Sandbox Code Playgroud)

I already know that

class Indexed {
int index;
String element;

Indexed(int index, String element) {
    this.index = index;
    this.element = element;
    }
}
Run Code Online (Sandbox Code Playgroud)

this can be used as a solution. But i don't like this kind of approach. I want …

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
175
查看次数