标签: rx-java2

如何在 RxJava 2 自定义运算符的 onSubscribe 上获取 RetroFit 请求的 URL 和方法?

因此,我尝试集成 Http 请求的 Firebase 性能并手动添加它们,如此处所示步骤 9)。

我正在使用 Retrofit 2 和 RxJava 2,所以我想到了做一个自定义运算符,检查下面的代码:

改造 2 客户端

@GET("branch-{environment}/v2/branches")
    fun getBranch(@Path("environment") environment: String, @Query("location") location: String, @Query("fulfilment_type") fulfilmentType: String): Single<Response<GetBranchResponse>>
Run Code Online (Sandbox Code Playgroud)

RxJava 调用 Retrofit 客户端

private val client: BranchClient = clientFactory.create(urlProvider.apiUrl)

override fun getBranch(postCode: String, fulfilmentType: FulfilmentType): Single<GetBranchResponse> {
        return client
                .getBranch(environment, postCode.toUpperCase(), fulfilmentType.toString())
                .lift(RxHttpPerformanceSingleOperator(URL?, METHOD?))
                .map { it.body() }
                .subscribeIO() //custom Kotlin extension 
                .observeMain() //custom Kotlin extension 
                ...
    } 
Run Code Online (Sandbox Code Playgroud)

RxJava 2 自定义操作符通过 lift:

class RxHttpPerformanceSingleOperator<T>(private val url: String, private val method: …
Run Code Online (Sandbox Code Playgroud)

kotlin firebase retrofit2 rx-java2 firebase-performance

2
推荐指数
1
解决办法
1244
查看次数

如何仅当响应为 404 时才在 RxJava2 中重试

我使用 Retrofit 和 RxJava2 实现了一个重试调用,但是我需要它仅在您获得与 404 不同的代码时重试。重试 404 是没有意义的。这就是我正在使用的

new RequestFactory()
    .requestBuilder
    .create(Service.class)
    .getData(id)
    .map(response -> response.object)
    .doOnError(t -> Log.e(NET, "Error fetching data id '" + id + "': " + t))
    .retry(3)
    .onErrorResumeNext(Observable.empty())
    .subscribeOn(Schedulers.io())
Run Code Online (Sandbox Code Playgroud)

java retrofit rx-java2

2
推荐指数
1
解决办法
1946
查看次数

Java 8“可选”不好的做法?

我有一个代码:

val d = Single
            .zip<List<X>, Optional<Y>, DataContent>(
                    xSingle,
                    YSingle,
                    BiFunction { x, y ->
                        val b = if (y.isPresent()) {
                            y.get()
                        } else {
                            null
                        }
                        return@BiFunction DataContent(x, b)
                    })
            .subscribe({ data ->
                ...
            }, { t ->
                ...
            })
Run Code Online (Sandbox Code Playgroud)

我听说,使用Optional检查示例中所示的空值是不好的做法。真的吗?为什么?有人可以展示使用的替代方案RxJava2吗?

android kotlin option-type rx-java2

2
推荐指数
1
解决办法
5461
查看次数

处置可观察量

这个问题与Android和生命周期有关。以前,我会有一系列主题并在创作时订阅它们。

销毁后,我会将所有主题标记为完整,假设它处理了所有订阅者。

在 Android Studio 3.1 中,我会收到针对“未使用”的任何订阅者的警告。解决方案是将它们添加到“完全一次性”中,然后我在销毁时进行处理。

我需要“复合一次性”来在销毁时正确取消请求吗?我之前将主题标记为完整的方法有什么作用吗?在这种情况下有必要吗?

作为代码示例:

val observable: PublishSubject<Int> = PublishSubject.create()
val disposable = observable.subscribe { /* subscription */ }

fun onDestroy() {
    observable.onComplete() // is this line necessary or helpful?
    disposable.dispose()
}
Run Code Online (Sandbox Code Playgroud)

android rx-java2 rx-kotlin2

2
推荐指数
1
解决办法
5647
查看次数

RxJava2 - 同步执行调用

我有一个TestService,我在其中执行异步任务来获取数据。我想等待回复后再继续。

public List<Data> getData() {
    List<Data> data = new ArrayList<>();

    Disposable disposable = repository.getDataFromApi(false)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe( newData -> {
                data.addAll(newData);
            }, __ -> { });
    mCompositeDisposable.add(disposable);

    //Here I want to stop till "Data" arraylist is filled with data 

    ... do something with data        
}
Run Code Online (Sandbox Code Playgroud)

在 Volley 中,我只需打电话 req.executeSynchronously();就能实现。由于getData()必须已经返回数据,我必须以某种方式让它等到我得到响应。怎么做?我在用着Single

我的方法是使用 getBlocking();


public List<Data> getData() {
    List<Data> data = new ArrayList<>();

    Disposable disposable = repository.getDataFromApi(false)
            .observeOn(AndroidSchedulers.mainThread())
            .blockingGet();
            .subscribe( newData -> {
                data.addAll(newData);
            }, __ -> …
Run Code Online (Sandbox Code Playgroud)

android rx-java2

2
推荐指数
1
解决办法
3025
查看次数

RxJava:根据条件完成流

我有 Observable,当某些数据来自 BLE 连接时,它会发出项目:

public interface CommunicationController {
     Flowable<DataContainer> dataReceived();
}
Run Code Online (Sandbox Code Playgroud)

除此之外,我想构建一个Observablecompletes当满足以下条件之一时:

A。我收到两条特定类型的消息(这是通过filter在收到的DataContainer项目上使用运算符来完成的。

communicationController.dataReceived()
    .filter(data -> isTypeA(data) || isTypeB(data))
    .take(2)
    .toList()
    .map(dataContainers -> doSomeMappingToCommon object) 
Run Code Online (Sandbox Code Playgroud)

b. 我收到一条特定类型的消息(再次使用filter运算符)。

communicationController.dataReceived()
    .filter(data -> isTypeC(data))
    .firstOrError()
    .map(dataContainers -> doSomeMappingToCommon object); 
Run Code Online (Sandbox Code Playgroud)

我怎样才能将这两个合并Observable为一个?此外,两个 Observable 中只有一个会发出一个项目。

android rx-java2

2
推荐指数
1
解决办法
1188
查看次数

使用 dispose handler rxjava 创建 Observable

我们想要观察视图大小的变化,我们创建一个如下的扩展:

fun View.layoutSizeObservable(): io.reactivex.Observable<Size> {
    return io.reactivex.Observable.create<Size> { emitter ->
        viewTreeObserver.addOnGlobalLayoutListener {
            Log.d("MainFragment", "ViewTreeObserver Listener called back.")
            if (measuredWidth > 0 && measuredHeight > 0) {
                emitter.onNext(Size(measuredWidth, measuredHeight))
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我们像这样使用它,它的功能很好:

sizeChangedDisposable = titleTextView.layoutSizeObservable().subscribe { size: Size ->
    Log.d("MainFragment", "Size changed subscribe on $size")
}
Run Code Online (Sandbox Code Playgroud)

然而,有一点并不完全是我们想要的,它是Listener通过 添加的addOnGlobalLayoutListener,但从未被删除。

我们可以调用sizeChangedDisposable.dispose()它将正确停止订阅:

D/MainFragment:大小已更改订阅 $size

但这将继续被调用:

D/MainFragment:回调的ViewTreeObserver监听器。

我们如何以及在哪里删除布局侦听器回调?

android kotlin rx-java2

2
推荐指数
1
解决办法
1555
查看次数

如何在 RxJava 中处理 dispose 而不会出现 InterruptedException

在下面的代码中,当dispose()被调用时,发射器线程被中断(InterruptedException被抛出 sleep 方法)。

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();
Run Code Online (Sandbox Code Playgroud)

从调试会话中我看到中断源FutureTask在处理期间被取消。在那里,将对照运行线程检查dispose()正在调用的线程,如果不匹配,则发射器被中断。由于我使用了计算,所以线程有所不同Scheduler

有什么方法可以使处理不中断此类发射器,或者实际上应该如何处理?我在这种方法中看到的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 之前正常完成该操作onComplete() …

java interrupted-exception rx-java rx-java2

2
推荐指数
1
解决办法
3202
查看次数

将 Rxjava Completable 转换为 Map

我需要在链中调用一个可完成的方法,完成后,它需要使用Map运算符
示例继续链

Single.just(someOperation())
     .flatMapCompletable{
         completableMethod()
    }
     .map{ // i need to continue here with map or some other operator
         doSomeOperation()
    }
Run Code Online (Sandbox Code Playgroud)

谁能帮我解决这个问题吗?

android rx-java rx-java2 rx-kotlin2

2
推荐指数
1
解决办法
909
查看次数

当多个表发生更改时,防止 Room Observable 查询多次触发

我在 Android Room 中使用可观察查询来触发更新,最终在底层数据发生变化时改变 UI。

有时这些查询涉及多个表,有时用户执行将新值插入到这些表中的操作。插入通常一个接一个地快速完成(即在不到二分之一秒的时间跨度内)

目前,从 Room 订阅更新的观察者将被触发 x 次,其中 x 是查询中已更新的表的数量。

因此,如果值在一秒内插入到查询中的六个不同表中,则任何观察该查询的观察者都会触发六次。

有没有办法告诉 Room;“每秒最多触发观察者一次”?或者,更好的是“当您看到更新时,等待半秒以收集任何其他更新,然后立即通知观察者所有内容”?

这是房间查询。正如您所看到的,涉及多个表。

 @Query("SELECT Distinct order_material.* FROM order_material" +
            " JOIN orders on order_material.orderId = orders.id" +
            " JOIN assignment on order_material.id = assignment.orderMaterialID" +
            " JOIN box_inside on assignment.boxInsideID = box_inside.id" +
            " WHERE orders.isActive = 1 " +
            " AND (box_inside.dateCFOrigPacked <= :dateLimitYema OR box_inside.stemLength IS NOT NULL)" +
            " AND assignment.id IN (SELECT id FROM assignment WHERE active = 1" +
            "      AND …
Run Code Online (Sandbox Code Playgroud)

rx-java2 android-room

2
推荐指数
1
解决办法
698
查看次数