标签: rx-java2

如何在RxJava2的链中批量处理Flowable输出

如何在RxJava2中链接要批量处理的流程。下面的流程图是我想要实现的。

Flowable#1          Flowable#2 (process every 10)
==============     ================================
callServer(p1) ->
      :        ->  saveToDatabase(List<r1 to r10>)
callServer(p20)->  saveToDatabase(List<r11 to r20>)
callServer(p21)->      :
      :                :
callServer(p35)->  saveToDatabase(List<r31 to r35>) //the remainder
Run Code Online (Sandbox Code Playgroud)

当前,我要等待所有结果返回,然后再保存到数据库中。

Flowable.fromIterable(paramList)
    .map(p -> callServer(p)) 
    //wait for the return a map of ALL the results r  
    //how to chain it such that saveToDatabase process after 'n' results 
    .toList()  
    .flatmap(listOfR -> saveToDatabase(listOfR); 
Run Code Online (Sandbox Code Playgroud)

我如何做到在每个“ n”个结果之后调用saveToDatabase而不是等待所有结果完成?

rx-java rx-java2

0
推荐指数
1
解决办法
854
查看次数

如何在android中使用rxjava2进行改造

嗨,我正在尝试学习 rxjava2。我正在尝试使用 rxjava2 调用 API,并使用改造来构建 URL 并将 JSON 转换为 Moshi。

我想将Observable模式与retrofit. 有谁知道怎么做?任何标准和最佳方法,例如用于错误处理的包装器等等?

应用模块.kt

@Provides
@Singleton
fun provideRetrofit(moshi: Moshi, okHttpClient: OkHttpClient): Retrofit {
    return Retrofit.Builder()
            .addConverterFactory(MoshiConverterFactory.create(moshi))
            .baseUrl(BuildConfig.BASE_URL)
            .client(okHttpClient)
            .build()
}
Run Code Online (Sandbox Code Playgroud)

ApiHelperImpl.kt

@Inject
lateinit var retrofit: Retrofit

override fun doServerLoginApiCall(email: String, password: String): Observable<LoginResponse> {
    retrofit.create(RestApi::class.java).login(email, password)
}
Run Code Online (Sandbox Code Playgroud)

doServerLoginApiCallLoginViewModel下面这样打电话

登录视图模型.kt

fun login(view: View) {
    if (isEmailAndPasswordValid(email, password)) {
        ApiHelperImpl().doServerLoginApiCall(email, password)
    }
}
Run Code Online (Sandbox Code Playgroud)

RestApi.kt

interface RestApi {

    @FormUrlEncoded
    @POST("/partner_login")
    fun login(@Field("email") email: String, @Field("password") password: …
Run Code Online (Sandbox Code Playgroud)

android kotlin retrofit2 rx-java2

0
推荐指数
1
解决办法
2086
查看次数

Rxjava:如何在没有完成所有 observable 的情况下组合多个 observable?

我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。

例如。

observable1 = PublishSubject<>()  
observable2 = PublishSubject<>()

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)

应该发出:

(1, null) 
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)

也有可能observable2在之前被发射observable1

CombineLatest是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?

java reactive-programming rx-java rx-java2

0
推荐指数
1
解决办法
1309
查看次数

为什么blockingSingle崩溃但blockingFirst有效

我只是从应用程序的共享首选项中读取一个值,因此应该只发出1个值。

SharedPrefAsync:

Observable<Boolean> getSomeValue() {
  // retrieve the value asycnhronously
}
Run Code Online (Sandbox Code Playgroud)

用法:
sharedPrefsAsync.getSomeValue().blockingFirst()可行,但sharedPrefsAsync.getSomeValue().blockingSingle()似乎使应用程序崩溃,没有错误日志

我已经阅读了官方文档,但不清楚blockingSingle和之间的区别blockingFirst

如何找出blockingSingle此处使用的实际问题?

android rx-java rx-java2

0
推荐指数
1
解决办法
612
查看次数

如何避免 RxJava 中的嵌套回调?

我正在使用Android 的Reactive Network库。我是 RxJava 的绝对初学者,我正在努力解决它。我想做的是:

1]持续观察手机网络连接状态的变化

2]如果手机已连接到网络,请检查一次是否有互联网连接

为此,我在 Kotlin 中有以下代码:

    ReactiveNetwork.observeNetworkConnectivity(applicationContext)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { connectivity ->
                if (connectivity.state == NetworkInfo.State.CONNECTED) {
                    ReactiveNetwork.checkInternetConnectivity()
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe { isConnectedToInternet ->
                                if (isConnectedToInternet) {
                                    Log.d("VED-APP", "Connected to Internet")
                                } else {
                                    Log.d("VED-APP", "Not Connected to Internet")
                                }
                            }
                } 
            }
Run Code Online (Sandbox Code Playgroud)

然而,这段代码很丑陋,而且非常嵌套。有没有办法清理这段代码?

尽管示例是 Kotlin 中的,但 Java 或 Kotlin 中的答案都会有所帮助。

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
365
查看次数

如果 RxJava 失败,如何恢复映射列表

给定以下代码,onErrorResumeNext 方法只会用“hi”替换列表中的错误项。我将如何让它继续迭代列表的其余部分?

List<Object> list = new ArrayList<>();
list.add("one");
list.add(new Testo());
list.add("two");
list.add("three");    

Observable.fromIterable(list)
    .map(item -> item.toString())
    .onErrorResumeNext(Observable.just("hi"))
    .subscribe(item -> System.out.println(item), onError -> System.out.println("error"));

private static class Testo {
    @Override
    public String toString() {
        throw new NullPointerException();
    }
}
Run Code Online (Sandbox Code Playgroud)

rx-java reactivex rx-java2

0
推荐指数
1
解决办法
385
查看次数

PublishSubject blocksLast() 挂起 Android 应用程序而不调用

我在从主题获取最后发出的值时遇到问题

这是我的班级,负责发射和观察电池变化:

class BatteryLevelProvider @Inject constructor(
app: App
) {

private val context: Context = app
private val receiver: PowerConnectionReceiver = PowerConnectionReceiver()

init {
initializeReceiver()
}

private fun initializeReceiver() {
IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intentFilter ->
  context.registerReceiver(receiver, intentFilter)
  }
}

companion object {
 val batteryLevelSubject = PublishSubject.create<Int>()
}

fun observeBatteryLevel(): Observable<Int> = batteryLevelSubject.distinctUntilChanged()

fun getCurrentBatteryLevel(): Int {
Timber.d("getCurrentBatteryLevel: ENTERED")
val blockingLast = batteryLevelSubject.blockingLast(0)
Timber.d("getCurrentBatteryLevel: $blockingLast")
return blockingLast
}

inner class PowerConnectionReceiver : BroadcastReceiver() {

override fun onReceive(context: Context, intent: Intent) {
  val …
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
596
查看次数

无法在 Kotlin 中创建 Observable

我正在尝试在 kotlin 中创建 Observable 。但是它在 OnSubscribe 方法上给出了错误未解决的引用

  fun getDisposableObserver(): Observable<Background> {

        return Observable.create(object :Observable.OnSubscribe<Background> ->{})
    }
Run Code Online (Sandbox Code Playgroud)

我试过这个片段。它也不起作用

  Observable.create(object : ObservableOn.OnSubscribe<Int> {
            override fun call(subscriber: Subscriber<in Int>) {
                for(i in 1 .. 5)
                    subscriber.onNext(i)

                subscriber.onCompleted()
            }
        })
Run Code Online (Sandbox Code Playgroud)

我做错了什么?,如何创建 Observable?

android kotlin rx-android rx-java2

0
推荐指数
1
解决办法
852
查看次数

如何在 RxJava 2 中访问和增加“rx2.buffer-size”?

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable中指出:

默认情况下,Observable 的运算符以 128 个元素的缓冲区大小运行(参见 Flowable.bufferSize(),可以通过系统参数 rx2.buffer-size 全局覆盖。

我的问题是,我如何访问和设置rx2.buffer-size?如果我做:

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import io.reactivex.*;
Run Code Online (Sandbox Code Playgroud)

...其次是:

Integer bufferSize = rx2.buffer-size;
Run Code Online (Sandbox Code Playgroud)

Studio 通知我:

  • 未使用的导入语句(for import io.reactivex.*;
  • 无法解析符号“rx2”
  • 无法解析符号“大小”

如何覆盖系统参数rx2.buffer-size

我的项目正在使用

io.reactivex.rxjava2/rxandroid/2.0.1
io.reactivex.rxjava2/rxjava/2.1.11
Run Code Online (Sandbox Code Playgroud)

rx-android rx-java2

0
推荐指数
1
解决办法
831
查看次数

rxjava - 如何从两个可观测量中获取交替排放

我有两个可观察者,我想从每个观察者中取出,直到两者都没有.有没有运营商这样做?让我向您展示我尝试过的内容以及我想要完成的内容:

  Observable<String> observable = Observable.just("hello","are", "doing");
        Observable<String> observable2 = Observable.just("how","you","today");
Run Code Online (Sandbox Code Playgroud)

我想"减少"这样,以便最终的排放量是" 你好,今天你好吗?

在这里我尝试使用扫描,但它更像是积累,我得到以下结果:

Observable.merge(observable, observable2).reduce(new BiFunction<String, String, String>() {
        @Override
        public String apply(String wordAccum1, String word2) {
            return wordAccum1 + " " + word2;
        }

    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String sentence) throws Exception {
            Log.v("consumerResult",sentence+"");
        }
    });
Run Code Online (Sandbox Code Playgroud)

日志输出显示:consumerResult:你好今天的做法

我怎么能连续从每个人那里拿走?

rx-java2

0
推荐指数
1
解决办法
50
查看次数