如何在RxJava2中链接要批量处理的流程。下面的流程图是我想要实现的。
Flowable#1 Flowable#2 (process every 10)
============== ================================
callServer(p1) ->
: -> saveToDatabase(List<r1 to r10>)
callServer(p20)-> saveToDatabase(List<r11 to r20>)
callServer(p21)-> :
: :
callServer(p35)-> saveToDatabase(List<r31 to r35>) //the remainder
Run Code Online (Sandbox Code Playgroud)
当前,我要等待所有结果返回,然后再保存到数据库中。
Flowable.fromIterable(paramList)
.map(p -> callServer(p))
//wait for the return a map of ALL the results r
//how to chain it such that saveToDatabase process after 'n' results
.toList()
.flatmap(listOfR -> saveToDatabase(listOfR);
Run Code Online (Sandbox Code Playgroud)
我如何做到在每个“ n”个结果之后调用saveToDatabase而不是等待所有结果完成?
嗨,我正在尝试学习 rxjava2。我正在尝试使用 rxjava2 调用 API,并使用改造来构建 URL 并将 JSON 转换为 Moshi。
我想将Observable模式与retrofit. 有谁知道怎么做?任何标准和最佳方法,例如用于错误处理的包装器等等?
应用模块.kt
@Provides
@Singleton
fun provideRetrofit(moshi: Moshi, okHttpClient: OkHttpClient): Retrofit {
return Retrofit.Builder()
.addConverterFactory(MoshiConverterFactory.create(moshi))
.baseUrl(BuildConfig.BASE_URL)
.client(okHttpClient)
.build()
}
Run Code Online (Sandbox Code Playgroud)
ApiHelperImpl.kt
@Inject
lateinit var retrofit: Retrofit
override fun doServerLoginApiCall(email: String, password: String): Observable<LoginResponse> {
retrofit.create(RestApi::class.java).login(email, password)
}
Run Code Online (Sandbox Code Playgroud)
我doServerLoginApiCall从LoginViewModel下面这样打电话
登录视图模型.kt
fun login(view: View) {
if (isEmailAndPasswordValid(email, password)) {
ApiHelperImpl().doServerLoginApiCall(email, password)
}
}
Run Code Online (Sandbox Code Playgroud)
RestApi.kt
interface RestApi {
@FormUrlEncoded
@POST("/partner_login")
fun login(@Field("email") email: String, @Field("password") password: …Run Code Online (Sandbox Code Playgroud) 我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。
例如。
observable1 = PublishSubject<>()
observable2 = PublishSubject<>()
observable1.onNext(1)
observable1.onNext(2)
observable2.onNext("Test")
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)
应该发出:
(1, null)
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)
也有可能observable2在之前被发射observable1
CombineLatest是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?
我只是从应用程序的共享首选项中读取一个值,因此应该只发出1个值。
SharedPrefAsync:
Observable<Boolean> getSomeValue() {
// retrieve the value asycnhronously
}
Run Code Online (Sandbox Code Playgroud)
用法:
sharedPrefsAsync.getSomeValue().blockingFirst()可行,但sharedPrefsAsync.getSomeValue().blockingSingle()似乎使应用程序崩溃,没有错误日志
我已经阅读了官方文档,但不清楚blockingSingle和之间的区别blockingFirst。
如何找出blockingSingle此处使用的实际问题?
我正在使用Android 的Reactive Network库。我是 RxJava 的绝对初学者,我正在努力解决它。我想做的是:
1]持续观察手机网络连接状态的变化
2]如果手机已连接到网络,请检查一次是否有互联网连接
为此,我在 Kotlin 中有以下代码:
ReactiveNetwork.observeNetworkConnectivity(applicationContext)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { connectivity ->
if (connectivity.state == NetworkInfo.State.CONNECTED) {
ReactiveNetwork.checkInternetConnectivity()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { isConnectedToInternet ->
if (isConnectedToInternet) {
Log.d("VED-APP", "Connected to Internet")
} else {
Log.d("VED-APP", "Not Connected to Internet")
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
然而,这段代码很丑陋,而且非常嵌套。有没有办法清理这段代码?
尽管示例是 Kotlin 中的,但 Java 或 Kotlin 中的答案都会有所帮助。
给定以下代码,onErrorResumeNext 方法只会用“hi”替换列表中的错误项。我将如何让它继续迭代列表的其余部分?
List<Object> list = new ArrayList<>();
list.add("one");
list.add(new Testo());
list.add("two");
list.add("three");
Observable.fromIterable(list)
.map(item -> item.toString())
.onErrorResumeNext(Observable.just("hi"))
.subscribe(item -> System.out.println(item), onError -> System.out.println("error"));
private static class Testo {
@Override
public String toString() {
throw new NullPointerException();
}
}
Run Code Online (Sandbox Code Playgroud) 我在从主题获取最后发出的值时遇到问题
这是我的班级,负责发射和观察电池变化:
class BatteryLevelProvider @Inject constructor(
app: App
) {
private val context: Context = app
private val receiver: PowerConnectionReceiver = PowerConnectionReceiver()
init {
initializeReceiver()
}
private fun initializeReceiver() {
IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intentFilter ->
context.registerReceiver(receiver, intentFilter)
}
}
companion object {
val batteryLevelSubject = PublishSubject.create<Int>()
}
fun observeBatteryLevel(): Observable<Int> = batteryLevelSubject.distinctUntilChanged()
fun getCurrentBatteryLevel(): Int {
Timber.d("getCurrentBatteryLevel: ENTERED")
val blockingLast = batteryLevelSubject.blockingLast(0)
Timber.d("getCurrentBatteryLevel: $blockingLast")
return blockingLast
}
inner class PowerConnectionReceiver : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
val …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 kotlin 中创建 Observable 。但是它在 OnSubscribe 方法上给出了错误未解决的引用
fun getDisposableObserver(): Observable<Background> {
return Observable.create(object :Observable.OnSubscribe<Background> ->{})
}
Run Code Online (Sandbox Code Playgroud)
我试过这个片段。它也不起作用
Observable.create(object : ObservableOn.OnSubscribe<Int> {
override fun call(subscriber: Subscriber<in Int>) {
for(i in 1 .. 5)
subscriber.onNext(i)
subscriber.onCompleted()
}
})
Run Code Online (Sandbox Code Playgroud)
我做错了什么?,如何创建 Observable?
在http://reactivex.io/RxJava/javadoc/io/reactivex/Observable中指出:
默认情况下,Observable 的运算符以 128 个元素的缓冲区大小运行(参见 Flowable.bufferSize(),可以通过系统参数 rx2.buffer-size 全局覆盖。
我的问题是,我如何访问和设置rx2.buffer-size?如果我做:
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import io.reactivex.*;
Run Code Online (Sandbox Code Playgroud)
...其次是:
Integer bufferSize = rx2.buffer-size;
Run Code Online (Sandbox Code Playgroud)
Studio 通知我:
import io.reactivex.*;) 如何覆盖系统参数rx2.buffer-size?
我的项目正在使用
io.reactivex.rxjava2/rxandroid/2.0.1
io.reactivex.rxjava2/rxjava/2.1.11
Run Code Online (Sandbox Code Playgroud) 我有两个可观察者,我想从每个观察者中取出,直到两者都没有.有没有运营商这样做?让我向您展示我尝试过的内容以及我想要完成的内容:
Observable<String> observable = Observable.just("hello","are", "doing");
Observable<String> observable2 = Observable.just("how","you","today");
Run Code Online (Sandbox Code Playgroud)
我想"减少"这样,以便最终的排放量是" 你好,今天你好吗?
在这里我尝试使用扫描,但它更像是积累,我得到以下结果:
Observable.merge(observable, observable2).reduce(new BiFunction<String, String, String>() {
@Override
public String apply(String wordAccum1, String word2) {
return wordAccum1 + " " + word2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String sentence) throws Exception {
Log.v("consumerResult",sentence+"");
}
});
Run Code Online (Sandbox Code Playgroud)
日志输出显示:consumerResult:你好今天的做法
我怎么能连续从每个人那里拿走?