标签: rx-java2

使用 RxJava2 下载带有进程的文件

正如标题所示,我想使用 RxJava2 和 okhttp 下载文件。而且还需要过程。

在我看来,数据流是String url-> Response-> 乘法Float process

所以首先我需要一个Single发出Response.

Single.create((SingleOnSubscribe<Response>) emitter -> {
    emitter.onSuccess(xx);// or throw error
});
Run Code Online (Sandbox Code Playgroud)

接下来我需要Flowable对吗?因为我想向观察者发送多个进程信息。

那么这里我该怎么办呢?提前致谢。

编辑

我尝试转换SingleFlowable.

Single.create((SingleOnSubscribe<Response>) emitter -> {
    emitter.onSuccess(xxx);// or throw error
}).toFlowable()
        .flatMap((Function<Response, Publisher<Float>>) response -> Flowable.create(xxx))
        .subscribe(process -> {});

}
Run Code Online (Sandbox Code Playgroud)

但我不知道这样做是否合适。

编辑

我不关心 okhttp 细节,我只关心与RxJava诸如Single和相关的内容Flowable

rx-java rx-java2

5
推荐指数
1
解决办法
3331
查看次数

使用 RxJava retryWhen 运算符时 Kotlin 类型不匹配

我正在尝试创建 Observable,它将在网络连接建立时重试。

我创建了主题:

 private val retrySubject = PublishSubject.create<Unit>()()
Run Code Online (Sandbox Code Playgroud)

我这样使用它:

  private fun publishNetworkReconnection() {
    compositeDisposable?.add(
       connectionHelper.observeConnection()
        .subscribe {connected: Boolean
          if(connected){
          retrySubject.onNext(null)
          }
        }
     )
  }
Run Code Online (Sandbox Code Playgroud)

然后我尝试在 retryWhen 运算符中使用它:

   val disposable =
    Flowable.interval(0, UPDATE_INTERVAL, TimeUnit.SECONDS, Schedulers.io())
        .onBackpressureDrop()
        .flatMapCompletable {
          revocationRepository.sync(event.id)
        }
        .retryWhen { retryHandler -> retryHandler.flatMap({ nothing -> retrySubject.asObservable() }) }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ }, { Timber.e(it, "Unable to sync blacklist") })
    compositeDisposable?.add(disposable)
  }
Run Code Online (Sandbox Code Playgroud)

这种情况下该如何正确使用呢?

我在 Android Studio 中收到此错误:

类型不匹配。必填:发布者<< out (???..???) >>! 发现:可观察<无效!>!

kotlin rx-java rx-java2

5
推荐指数
1
解决办法
1893
查看次数

使用 rxjava 正确处理所有类型的 Retrofit 错误

我是新手,我要求最好的正确方法来处理 Retrofit 使用中的所有可能状态,其中Rxjava包括:Retrofitrxjavarxbinding

  1. 没有网络连接。
  2. 来自服务器的空响应。
  3. 成功响应。
  4. 错误响应并显示错误消息,如Username or password is incorrect
  5. 其他错误,例如connection reset by peers.

android retrofit2 rx-binding rx-java2

5
推荐指数
1
解决办法
2489
查看次数

订阅后合并 Observables

是否可以在订阅后发送给另一个merge人,以便他们可以看到结果?ObservableObserver

我知道这行不通,但我的意思是这样的:

observable.subscribe( (i) -> doSomething(i) );
observable.merge(anotherObservable);
Run Code Online (Sandbox Code Playgroud)

然后doSomething也会收到 的值anotherObservable

Observables另外,即使所有合并已经完成,是否有办法继续合并?

rx-android rx-java2

5
推荐指数
0
解决办法
151
查看次数

使用 ThreatLatest、distinctUntilChanged 到 Kotlin 协程的 RxJava 运算符

将此 RxJava 链转换为 Kotlin 协程的最佳方法是什么?

RxTextView
    .textChanges(inputEditText)
    .map { it.toString() }
    .throttleLatest(500, TimeUnit.MILLISECONDS, true)
    .distinctUntilChanged()
    .subscribe({ text ->
        // More logic here
    }, { error ->
        // Error handling
    })
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java2 kotlin-coroutines

5
推荐指数
1
解决办法
574
查看次数

Lateinit 属性数据尚未初始化

使用 Retrofit2 和 rxjava2 未在 Recyclerview 中设置 Gson Convertable 数据,然后通过其订阅给出错误:

UninitializedPropertyAccessException: lateinit property data has not been initialized
Run Code Online (Sandbox Code Playgroud)

通过retrofit2和rxjava2解析JSON数据。解析GSON数据转换GSon时,rxjava2订阅数据然后给出lateinit属性错误并且它没有在recyclerview中设置。

MainActivity.kt

class Company : AppCompatActivity() {


    internal lateinit var api : APIInterface

    var compositeDisposable = CompositeDisposable()



    internal lateinit var companyDialog : Dialog

    internal lateinit var adapter: CompanyAdapter

    internal lateinit var data : List<Company>

    internal lateinit var rvCompany : RecyclerView

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_company)

        companyDialog = Dialog(this)

        //companyAdapter = CompanyAdapter()


        btnSelectCompany.setOnClickListener{
            showCompanyPopupView()
        }



    }

    fun showCompanyPopupView(){
        companyDialog.setContentView(R.layout.compny_popup_screen)

         rvCompany  = …
Run Code Online (Sandbox Code Playgroud)

kotlin android-recyclerview retrofit2 rx-java2

5
推荐指数
1
解决办法
2万
查看次数

如何摆脱嵌套的 RxJava 流?

我对互联网、数据库进行了一系列调用,结果我向用户显示收集到的信息。现在我有非常难看的三层嵌套 RxJava 流。我真的想让它变得流畅且易于阅读,但我真的很难坚持下去。

我已经阅读了有关 Map、flatMap、zip 等的所有内容。不能让这些东西一起工作。

代码:进行api调用。接收到的信息放入数据库中,在第一个流的 onSuccess 方法中订阅另一个流,并在第二个流的 onSuccess 方法中从数据库接收到的信息最终显示给用户。

达特·弗兰肯斯坦:

disposables.add(modelManager.apiCall()
                .subscribeOn(Schedulers.io())
                .observeOn(mainThread)
                .subscribeWith(new DisposableSingleObserver {

                  public void onSuccess(ApiResponse apiResponse) {

                        modelManager.storeInDatabase(apiResponse)
                       //level 1 nested stream:
                        disposables.add(modelManager.loadFromDatabas()
                                  .subscribeOn(Schedulers.io())
                                  .observeOn(mainThread)
                                  .subscribeWith(new DisposableSingleObserver{
                                    public void onSuccess(Data data) {
                                        view.showData(data);
                                    }
                                    public void onError(Throwable e) {
                                    }
                                }));
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                }));
    }
Run Code Online (Sandbox Code Playgroud)

java android retrofit rx-java2

5
推荐指数
1
解决办法
1082
查看次数

如何修复映射器函数返回空值。”Java/RxJava2 Null 中的错误?

我对 RxJava2 和流(Observable 和 Observer)有一些问题实际上,我的流返回一个错误

The mapper function returned a null value.
io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:51)
retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:47)
Run Code Online (Sandbox Code Playgroud)

我的 logcat 不显示代码中断的位置。


 public static Observable<List<Business.Response.Doc>> streamArticles_Business() {
        NYT_TopStories_services nyt_services = NYT_TopStories_services.retrofit.create(NYT_TopStories_services.class);
        return nyt_services.getArticles_Business()
                .map(Business.Response::getDocs)
                .filter(str -> str != null)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .timeout(20, TimeUnit.SECONDS);
    }

Run Code Online (Sandbox Code Playgroud)

和我的改造得到

 @GET("search/v2/articlesearch.json?q=business&api-key=" + apiKey)
    Observable<Business.Response> getArticles_Business();


    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl("https://api.nytimes.com/svc/")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build();


Run Code Online (Sandbox Code Playgroud)

你有办法解决这个问题吗?

null stream mapper rx-java2

5
推荐指数
1
解决办法
9441
查看次数

我是否应该在使用 Retrofit 执行的每个请求中传递我的 APIKEY?

我想知道将 APIKEY 放入所有 REST 请求中的最佳方法,而不必将其添加到请求的参数中。

目前我只接到了几个电话,但我正在尝试进一步了解。

@GET(".")
fun getSearch(@Query("s") text: String, @Query("apikey") APIKEY: String) : Observable<ResponseSearch>
Run Code Online (Sandbox Code Playgroud)

我想知道是否有办法不在每次调用的变量中包含 APIKEY

api-key retrofit2 rx-java2

5
推荐指数
1
解决办法
2807
查看次数

Android 将特定 API 调用限制为 3 秒内一次

我想限制/testAPI 调用每 3 秒调用一次,例如:

2021-09-21 14:09:19.920 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.031 V/OkHttp: <-- 200 https://xxx/test (109ms)
2021-09-21 14:09:20.038 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.136 V/OkHttp: <-- 200 https://xxx/test (96ms)
2021-09-21 14:09:20.146 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.315 V/OkHttp: <-- 200 https://xxx/test (168ms)
2021-09-21 14:09:20.325 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.499 V/OkHttp: <-- 200 https://xxx/test (172ms)
2021-09-21 14:09:20.514 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.615 V/OkHttp: <-- 200 https://xxx/test (100ms)
2021-09-21 14:09:20.628 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.721 V/OkHttp: …
Run Code Online (Sandbox Code Playgroud)

android retrofit2 rx-java2

5
推荐指数
1
解决办法
477
查看次数