标签: rx-java3

io.reactivex.exceptions.UndeliverableException 异常无法传递给消费者,因为它已经被取消/处置

使用时出现 UndeliverableExceptioncompletable

public Completable createBucketWithStorageClassAndLocation() {
        return Completable.complete()
                .doFinally(() -> {
            Bucket bucket =
                    storage.create(
                            BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
                                    .setStorageClass(storageClass)
                                    .setLocation(googleUploadObjectConfiguration.locationName())
                                    .build());       
        }).doOnError(error -> LOG.error(error.getMessage()));
    }
Run Code Online (Sandbox Code Playgroud)

异常是从 Google 存储中抛出的,这是正确的,但尝试处理doOnError方法

Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
Run Code Online (Sandbox Code Playgroud)

RXJava 异常

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. …
Run Code Online (Sandbox Code Playgroud)

java rx-java reactivex rx-java2 rx-java3

15
推荐指数
2
解决办法
3万
查看次数

如何将挂起函数转换为 RX Single(或 Completable)?

我们正在将我们的项目从 RX 重构为 Kotlin 协程,但不是一次性完成的,因此我们需要我们的项目同时使用两者一段时间。

现在我们有很多像这样使用 RX single 作为返回类型的方法,因为它们是繁重、长时间运行的操作,例如 API 调用。

fun foo(): Single<String> // Heavy, long running opertation

我们希望它是这样的:

suspend fun foo(): String // The same heavy, long running opertation

当我们使用这种方法时,我们仍然希望使用 RX。

我们一直在这样做:

foo()
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }
Run Code Online (Sandbox Code Playgroud)

现在我应该如何将我的挂起乐趣转换为生成一个我可以使用的 Single?

这是一个好主意吗?

Single.fromCallable {
    runBlocking {
        foo() // This is now a suspend fun
    }
}
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java kotlin-coroutines rx-java3

6
推荐指数
1
解决办法
5903
查看次数

RxJava 3 对 Room 的支持

我在我的项目中将 RxJava3 与 Room 一起使用,但出现以下错误

错误:不确定如何将 Cursor 转换为该方法的返回类型 (io.reactivex.rxjava3.core.Flowable>)

下面是我收到错误的 DAO 接口方法

@Query("SELECT * FROM wishlist_table")
Flowable<List<WishListMovie>> getWishList();
Run Code Online (Sandbox Code Playgroud)

我想可能是因为我在成绩文件中使用了以下依赖项:

implementation "androidx.room:room-rxjava2:$room_version"
Run Code Online (Sandbox Code Playgroud)

我试图找到 RxJava 3 的上述依赖项,但我找不到它。

我想知道如何在 Room 中使用 RxJava 3 或者我应该在我的项目中使用 RxJava 2。

android rx-java android-room rx-java3

5
推荐指数
2
解决办法
3254
查看次数

Retrofit2 和 rxjava3:java.lang.IllegalArgumentException:无法找到 io.reactivex.rxjava3.core.Observable 的调用适配器

我想使用 Retrofit2 和 rxjava3 但我看到以下错误

Caused by: java.lang.IllegalArgumentException: Could not locate call adapter for io.reactivex.rxjava3.core.Observable<java.lang.Object>.
      Tried:
       * retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory
       * retrofit2.CompletableFutureCallAdapterFactory
       * retrofit2.DefaultCallAdapterFactory
Run Code Online (Sandbox Code Playgroud)

此错误表明Rxjava 没有适配器,但我将其添加到下面的行中

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())

public class ServiceGenerator {
  private static final String BASE_URL = "http://example.com/";
  private static final OkHttpClient okHttpClient = new OkHttpClient.Builder()

    .connectTimeout(30, TimeUnit.SECONDS)
    .writeTimeout(30, TimeUnit.SECONDS)
    .readTimeout(30, TimeUnit.SECONDS)
    .build();

  public static <S> S createServiceSample(Class<S> serviceClass) {
    Retrofit.Builder builder =
      new Retrofit.Builder()
        .client(okHttpClient)
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create());
    builder.baseUrl(BASE_URL);
    Retrofit retrofit = builder.build();
    return retrofit.create(serviceClass);
  }

}
Run Code Online (Sandbox Code Playgroud)

和我的 Api 接口 …

android adapter rx-java retrofit2 rx-java3

4
推荐指数
1
解决办法
3321
查看次数

RXJava 的问题

我正在改编来自三词地址的一些示例代码,以便通过 Java SDK 访问他们的 API。它使用 RXJava。

示例代码是:

Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });
Run Code Online (Sandbox Code Playgroud)

首先。这会在构建时给出弃用警告以及 IDE 警告 ( Result of 'Observable.subscribe()' is ignored)。

为了解决第一个问题,我Disposable myDisposable = Observable. 它是否正确?(添加位置见下文)

接下来,我需要添加超时,以便在请求超时时可以显示警告等。为此,我已添加.timeout(5000, TimeUnit.MILLISECONDS)到构建器中。

timeout这是可行的,但s 似乎对 s 起作用的方式Observable是它们抛出异常,而我不知道如何捕获和处理该异常。

我现在拥有的是:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) …
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-java2 rx-java3

1
推荐指数
1
解决办法
2287
查看次数