标签: rx-java

RX Java with Retrofit-2 VS Normal Retrofit-2

我目前Retrofit2用于API解析。因为我被要求用RxJava+Retrofit为我的新应用程序更改它。我怎样才能做到这一点。什么是使用的好处RxJava沿Retrofit

任何帮助都应该受到高度赞赏。

下面是我用于正常Retrofit解析的代码

Retrofit.Builder builder =new Retrofit.Builder().baseUrl(API_BASE_URL).client(httpClient).addConverterFactory(GsonConverterFactory.create());
Retrofit retrofit = builder.client(httpClient).build();
return retrofit.create(serviceClass);
Run Code Online (Sandbox Code Playgroud)

android android-layout rx-java retrofit2

1
推荐指数
1
解决办法
720
查看次数

在 RXJava 调用之前未显示进度对话框

我有以下代码,在我执行长时间运行的任务之前,我启动了一个环形对话框,在 RxJava2 中完成。问题是对话框没有显示,我认为我没有阻塞主 UI 线程。

  fab.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View view) {
            if (ringProgressDialog != null) {
                if (ringProgressDialog.isShowing()) {
                    ringProgressDialog.dismiss();
                }
            }
            ringProgressDialog = ProgressDialog.show(SendConversationsActivity.this,
                    getResources().getString(R.string.creating_document_progress_dialog_title),
                    getResources().getString(R.string.conversation_progress_dialog_text),
                    true, false);
            FileNameAndContacts filenameAndContacts = new FileNameAndContacts();
            if (tvNoDatSelected.getVisibility() != View.VISIBLE) {
                filenameAndContacts.setFileName("");
            }
            createDocument(filenameAndContacts)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnError(throwable -> Timber.e(throwable, "Error in getting document"))
                    .subscribe(fileAndContacts -> {
                        if (ringProgressDialog.isShowing()) { //debugger says 
                                                              //dialog is showing. 
                            ringProgressDialog.dismiss();
                        }
                        sendDocumentInEmail(fileAndContacts);

                    });
     }
Run Code Online (Sandbox Code Playgroud)

任务正常执行。在我在同一个活动中执行另一个 RXJava 任务之前,我还显示了另一个环进度对话框,这个对话框出现了。

如果我注释掉 RxJava 调用,就会出现对话框。所以 RxJava 调用中的某些东西是阻塞的。

//EDIT 这个简单的 …

android progressdialog rx-java

1
推荐指数
1
解决办法
2715
查看次数

如何将 Completable 任务列表合并为一个 Completable 结果?

我有一个返回 a 的方法,Single<List<Item>>我想获取此列表中的每个项目并将其向下游传递给返回Completable. 我想等到每个项目成功完成并返回Completable结果。我最初的做法是分别处理每个项目使用flatMapIterable和组合使用的结果toList,但我不能把toList一个上Completable对象。有没有其他方法可以以这种方式将许多Completable任务“聚合”为一个Completable?这是我到目前为止所拥有的:

public Single<List<Item>> getListOfItems() {
    ...
}

public Completable doSomething(Item item) {
    ...
}

public Completable processItems() {
    return getListOfItems()     
        .toObservable()
        .flatMapIterable(items -> items)
        .flatMapCompletable(item -> doSomething(item))
        .toList()    // ERROR: No method .toList() for Completable
        .ignoreElements();
}
Run Code Online (Sandbox Code Playgroud)

java rx-java rx-android reactivex rx-java2

1
推荐指数
1
解决办法
2457
查看次数

生成无限并行流

问题

嗨,我有一个函数,我将返回无限的并行流(是的,在这种情况下它要快得多)生成的结果。所以很明显(或不)我用过

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()
Run Code Online (Sandbox Code Playgroud)

它有效,但是......当我想限制结果时它不会(当流是顺序的时一切都很好)。我的意思是,当我做类似的事情时,它会产生结果

stream.peek(System.out::println).limit(2).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

但即使peek输出产生超过 10 个元素,collect仍然没有最终确定(生成很慢,所以这 10个元素甚至可能需要一分钟)......这是一个简单的例子。实际上,限制这些结果是未来,因为主要期望是在用户终止进程之前只获得比最近的结果更好的结果(其他情况是首先返回我可以通过抛出异常findFirst所做的事情,如果没有其他帮助[没有,即使我在控制台上有更多元素并且在大约 30 秒内没有更多结果])。

所以,问题是...

如何复制?我的想法也是使用 RxJava,还有另一个问题 - 如何使用该工具(或其他工具)实现类似的结果。

代码示例

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

实施findSolution并不重要。它有一些副作用,比如添加到解决方案仓库(singleton、sych 等),但仅此而已。

java java-8 rx-java java-stream

1
推荐指数
1
解决办法
1027
查看次数

RxAndroid - 使用 Zip 运算符处理错误

我试图找到一种方法来并行执行请求并在每个可观察对象完成时处理它们。尽管当所有 observables 都给出响应时一切都在工作,但我没有看到在一切都完成后处理所有错误的方法。

这是 zip 运算符的示例,它基本上并行执行 2 个请求:

Observable.zip(
                getObservable1()
                        .onErrorResumeNext { errorThrowable: Throwable ->
                            Observable.error(ErrorEntity(Type.ONE, errorThrowable))
                        }.subscribeOn(Schedulers.io()),
                getObservable2()
                        .onErrorResumeNext { errorThrowable: Throwable ->
                            Observable.error(ErrorEntity(Type.TWO, errorThrowable))
                        }.subscribeOn(Schedulers.io()),
                BiFunction { value1: String, value2: String ->
                    return@BiFunction value1 + value2
                })
                //execute requests should be on io() thread
                .subscribeOn(Schedulers.io())
                //there are other tasks inside subscriber that need io() thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        { result ->
                            Snackbar.make(view, "Replace with your own action " + result, Snackbar.LENGTH_LONG)
                                    .setAction("Action", null).show()
                        },
                        { error ->
                            Log.d("TAG", "Error …
Run Code Online (Sandbox Code Playgroud)

error-handling kotlin rx-java rx-android rx-java2

1
推荐指数
1
解决办法
2246
查看次数

我如何在 android 中通过 RxJava2 实现这一点?

我正在尝试将图像上传到 Firebase 存储,我想通过 RxJava2 实现这一目标。这是我的代码..

public override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent) {

    if (requestCode == CropImage.CROP_IMAGE_ACTIVITY_REQUEST_CODE) {
        val result = CropImage.getActivityResult(data)
        if (resultCode == Activity.RESULT_OK) {
            val resultUri = result.uri
            var actualImageFile = File(resultUri.path)
            dialogs = SpotsDialog(this, "upload")
            imageCompressor = Compressor(this)
            var image_bitmap = imageCompressor
                    ?.setMaxWidth(200)
                    ?.setMaxHeight(200)
                    ?.setQuality(75)
                    ?.compressToBitmap(actualImageFile)
            profile_image?.setImageBitmap(image_bitmap)
            dialogs?.show()
            try{
            FirebaseStorage.getInstance().reference.child("profile_images").child(FirebaseAuth.getInstance().currentUser?.uid + ".jpg").putFile(resultUri)
                    .addOnCompleteListener { task: Task<UploadTask.TaskSnapshot> ->
                        if (task.isSuccessful) {
                            showMessage("image uploaded")
                            val baos = ByteArrayOutputStream()
                            image_bitmap?.compress(Bitmap.CompressFormat.JPEG, 100, baos)
                            FirebaseStorage.getInstance().reference.child("profile_images").child("thumbs_images").child(FirebaseAuth.getInstance().currentUser?.uid + ".jpg").putBytes(baos.toByteArray())
                                    .addOnCompleteListener { …
Run Code Online (Sandbox Code Playgroud)

android firebase rx-java firebase-storage rx-java2

1
推荐指数
1
解决办法
554
查看次数

为什么不建议在一个管道中修改对象

我有一个关于 RxJava 的一般性问题。我在很多地方看到说完全不建议在一个管道步骤中修改对象。您应该创建一个新的并将其传递下去,或doOnNext()用于副作用。

例如

 Observable.fromIterable(users)
        .map(user -> {
            user.name =//do something with name);
            user.age = // do something with age);
            return user;
        }).subscribe(user -> {

        });
Run Code Online (Sandbox Code Playgroud)

如您所见,用户对象正在管道步骤中被修改。但是这段代码运行良好,没有任何问题。但是为什么它被认为是一种不好的做法呢?任何人都可以更好地解释为什么对象在整个流中应该是不可变的?

rx-java rx-java2

1
推荐指数
1
解决办法
71
查看次数

在 RxJava2 中将 Listener 转换为 Single

我正在使用 Play Services Auth api Phone,到目前为止我有以下内容

  fun startSmsListener() {
    val client = SmsRetriever.getClient(applicationContext /* context */);
    val task = client.startSmsRetriever();
    task.addOnSuccessListener(object : OnSuccessListener<Void> {
        override fun onSuccess(p0: Void?) {
             //do somethin
        }

    })
    task.addOnFailureListener(object : OnFailureListener {
        override fun onFailure(p0: Exception) {
            //Handle error
        }

    })
}
Run Code Online (Sandbox Code Playgroud)

现在我想把它放在一个 SmsManager 类中,并将它转换成一个 Single/Observable,这样我就可以在我的视图模型中以一种反应方式处理它。我怎样才能做到这一点?

到目前为止,我有这个:

var single = Single.create(SingleOnSubscribe<Void> { e ->
                val task = client.startSmsRetriever()
                task.addOnSuccessListener {
                    e.onSuccess(it)
                }
                task.addOnFailureListener {
                    e.onError(it)
                }
        })
Run Code Online (Sandbox Code Playgroud)

但我不确定这段代码是否正确,是否有遗漏的东西,比如在处理后删除监听器。

有什么帮助吗?

android reactive-programming kotlin rx-java rx-java2

1
推荐指数
1
解决办法
832
查看次数

.zip 的可迭代问题(rxJava)

我有四个 Observable 的代码。我Observable.zip对他们应用:

Observable<Currencies> usd = CoinMarket.getMarketApi().getCurrencies();
    Observable<Currencies> rub = CoinMarket.getMarketApi().getCurrencies("RUB");
    Observable<Currencies> eur = CoinMarket.getMarketApi().getCurrencies("EUR");
    Observable<Currencies> btc = CoinMarket.getMarketApi().getCurrencies("BTC");

    List<Observable<Currencies>> singles = new ArrayList<>();
    singles.add(usd);
    singles.add(rub);
    singles.add(eur);
    singles.add(btc);

    Observable<Currencies> c = Observable.zip(singles, new Function<Currencies[], Currencies>() {
                @Override
                public Currencies apply(@NotNull Currencies[] objects) throws Exception {
                    return ramming(objects);
                }
            });
Run Code Online (Sandbox Code Playgroud)

但我有错误:

在此处输入图片说明

该消息表示未找到具有此类签名的方法(也许我不正确),但我去声明并查看签名,有点像,是合适的。

 public static <T, R> Observable<R> zip
 (Iterable<? extends ObservableSource<? extends T>> sources,
 Function<? super Object[], ? extends R> zipper)
Run Code Online (Sandbox Code Playgroud)

我做错了什么?

PS:我认为这个错误RxJava2Iterableisbroken

android rx-java

1
推荐指数
1
解决办法
2377
查看次数

在房间里流动

@Database(entities = {User.class}, version = 2, exportSchema = false)
public abstract class AppDatabase extends RoomDatabase {
    public abstract userDao userDao();
}
Run Code Online (Sandbox Code Playgroud)

Pojo 用户类

@Entity
    public class User {
        @PrimaryKey(autoGenerate = true)
        private int id;

        public User(){
        }



        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }
    }
Run Code Online (Sandbox Code Playgroud)

@Dao
public interface userDao {
            @Query("SELECT * FROM User WHERE id = :id")
            Flowable<User> get(int id);
            @Insert
            Completable insert(User user);
        }
Run Code Online (Sandbox Code Playgroud)

依赖关系

implementation "androidx.room:room-runtime:2.2.3"
annotationProcessor …
Run Code Online (Sandbox Code Playgroud)

java android rx-java android-room

1
推荐指数
1
解决办法
1108
查看次数