正如标题所示,我想使用 RxJava2 和 okhttp 下载文件。而且还需要过程。
在我看来,数据流是String url
-> Response
-> 乘法Float process
。
所以首先我需要一个Single
发出Response
.
Single.create((SingleOnSubscribe<Response>) emitter -> {
emitter.onSuccess(xx);// or throw error
});
Run Code Online (Sandbox Code Playgroud)
接下来我需要Flowable
对吗?因为我想向观察者发送多个进程信息。
那么这里我该怎么办呢?提前致谢。
我尝试转换Single
为Flowable
.
Single.create((SingleOnSubscribe<Response>) emitter -> {
emitter.onSuccess(xxx);// or throw error
}).toFlowable()
.flatMap((Function<Response, Publisher<Float>>) response -> Flowable.create(xxx))
.subscribe(process -> {});
}
Run Code Online (Sandbox Code Playgroud)
但我不知道这样做是否合适。
我不关心 okhttp 细节,我只关心与RxJava
诸如Single
和相关的内容Flowable
。
我正在尝试创建 Observable,它将在网络连接建立时重试。
我创建了主题:
private val retrySubject = PublishSubject.create<Unit>()()
Run Code Online (Sandbox Code Playgroud)
我这样使用它:
private fun publishNetworkReconnection() {
compositeDisposable?.add(
connectionHelper.observeConnection()
.subscribe {connected: Boolean
if(connected){
retrySubject.onNext(null)
}
}
)
}
Run Code Online (Sandbox Code Playgroud)
然后我尝试在 retryWhen 运算符中使用它:
val disposable =
Flowable.interval(0, UPDATE_INTERVAL, TimeUnit.SECONDS, Schedulers.io())
.onBackpressureDrop()
.flatMapCompletable {
revocationRepository.sync(event.id)
}
.retryWhen { retryHandler -> retryHandler.flatMap({ nothing -> retrySubject.asObservable() }) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ }, { Timber.e(it, "Unable to sync blacklist") })
compositeDisposable?.add(disposable)
}
Run Code Online (Sandbox Code Playgroud)
这种情况下该如何正确使用呢?
我在 Android Studio 中收到此错误:
类型不匹配。必填:发布者<< out (???..???) >>! 发现:可观察<无效!>!
我是新手,我要求最好的正确方法来处理 Retrofit 使用中的所有可能状态,其中Rxjava
包括:Retrofit
rxjava
rxbinding
Username or password is incorrect
。connection reset by peers
.是否可以在订阅后发送给另一个merge
人,以便他们可以看到结果?Observable
Observer
我知道这行不通,但我的意思是这样的:
observable.subscribe( (i) -> doSomething(i) );
observable.merge(anotherObservable);
Run Code Online (Sandbox Code Playgroud)
然后doSomething
也会收到 的值anotherObservable
。
Observables
另外,即使所有合并已经完成,是否有办法继续合并?
将此 RxJava 链转换为 Kotlin 协程的最佳方法是什么?
RxTextView
.textChanges(inputEditText)
.map { it.toString() }
.throttleLatest(500, TimeUnit.MILLISECONDS, true)
.distinctUntilChanged()
.subscribe({ text ->
// More logic here
}, { error ->
// Error handling
})
Run Code Online (Sandbox Code Playgroud) 使用 Retrofit2 和 rxjava2 未在 Recyclerview 中设置 Gson Convertable 数据,然后通过其订阅给出错误:
UninitializedPropertyAccessException: lateinit property data has not been initialized
Run Code Online (Sandbox Code Playgroud)
通过retrofit2和rxjava2解析JSON数据。解析GSON数据转换GSon时,rxjava2订阅数据然后给出lateinit属性错误并且它没有在recyclerview中设置。
MainActivity.kt
class Company : AppCompatActivity() {
internal lateinit var api : APIInterface
var compositeDisposable = CompositeDisposable()
internal lateinit var companyDialog : Dialog
internal lateinit var adapter: CompanyAdapter
internal lateinit var data : List<Company>
internal lateinit var rvCompany : RecyclerView
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_company)
companyDialog = Dialog(this)
//companyAdapter = CompanyAdapter()
btnSelectCompany.setOnClickListener{
showCompanyPopupView()
}
}
fun showCompanyPopupView(){
companyDialog.setContentView(R.layout.compny_popup_screen)
rvCompany = …
Run Code Online (Sandbox Code Playgroud) 我对互联网、数据库进行了一系列调用,结果我向用户显示收集到的信息。现在我有非常难看的三层嵌套 RxJava 流。我真的想让它变得流畅且易于阅读,但我真的很难坚持下去。
我已经阅读了有关 Map、flatMap、zip 等的所有内容。不能让这些东西一起工作。
代码:进行api调用。接收到的信息放入数据库中,在第一个流的 onSuccess 方法中订阅另一个流,并在第二个流的 onSuccess 方法中从数据库接收到的信息最终显示给用户。
达特·弗兰肯斯坦:
disposables.add(modelManager.apiCall()
.subscribeOn(Schedulers.io())
.observeOn(mainThread)
.subscribeWith(new DisposableSingleObserver {
public void onSuccess(ApiResponse apiResponse) {
modelManager.storeInDatabase(apiResponse)
//level 1 nested stream:
disposables.add(modelManager.loadFromDatabas()
.subscribeOn(Schedulers.io())
.observeOn(mainThread)
.subscribeWith(new DisposableSingleObserver{
public void onSuccess(Data data) {
view.showData(data);
}
public void onError(Throwable e) {
}
}));
}
@Override
public void onError(Throwable e) {
}
}));
}
Run Code Online (Sandbox Code Playgroud) 我对 RxJava2 和流(Observable 和 Observer)有一些问题实际上,我的流返回一个错误
The mapper function returned a null value.
io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:51)
retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:47)
Run Code Online (Sandbox Code Playgroud)
我的 logcat 不显示代码中断的位置。
public static Observable<List<Business.Response.Doc>> streamArticles_Business() {
NYT_TopStories_services nyt_services = NYT_TopStories_services.retrofit.create(NYT_TopStories_services.class);
return nyt_services.getArticles_Business()
.map(Business.Response::getDocs)
.filter(str -> str != null)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(20, TimeUnit.SECONDS);
}
Run Code Online (Sandbox Code Playgroud)
和我的改造得到
@GET("search/v2/articlesearch.json?q=business&api-key=" + apiKey)
Observable<Business.Response> getArticles_Business();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.nytimes.com/svc/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
Run Code Online (Sandbox Code Playgroud)
你有办法解决这个问题吗?
我想知道将 APIKEY 放入所有 REST 请求中的最佳方法,而不必将其添加到请求的参数中。
目前我只接到了几个电话,但我正在尝试进一步了解。
@GET(".")
fun getSearch(@Query("s") text: String, @Query("apikey") APIKEY: String) : Observable<ResponseSearch>
Run Code Online (Sandbox Code Playgroud)
我想知道是否有办法不在每次调用的变量中包含 APIKEY
我想限制/test
API 调用每 3 秒调用一次,例如:
2021-09-21 14:09:19.920 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.031 V/OkHttp: <-- 200 https://xxx/test (109ms)
2021-09-21 14:09:20.038 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.136 V/OkHttp: <-- 200 https://xxx/test (96ms)
2021-09-21 14:09:20.146 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.315 V/OkHttp: <-- 200 https://xxx/test (168ms)
2021-09-21 14:09:20.325 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.499 V/OkHttp: <-- 200 https://xxx/test (172ms)
2021-09-21 14:09:20.514 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.615 V/OkHttp: <-- 200 https://xxx/test (100ms)
2021-09-21 14:09:20.628 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.721 V/OkHttp: …
Run Code Online (Sandbox Code Playgroud)