标签: rx-java2

RxJava 2.0 - 如何将Observable转换为Publisher

如何在RxJava版本2中将Observable转换为Publisher?

在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要.但是我怎么能在RxJava 2中做到这一点?

java reactive-programming reactive-streams reactivex rx-java2

14
推荐指数
1
解决办法
9249
查看次数

无法在rxjava 2中解析方法Observable.from

在rxjava 1中的Observable类中有一个from方法但在rxjava 2中找不到.如何在以下代码中替换rxjava 2中的from方法:

    List<Integer> ints = new ArrayList<>();
    for (int i=1; i<10; i++) {
        ints.add(new Integer(i));
    }
    Observable.just(ints)
            .flatMap(new Function<List<Integer>, Observable<Integer>>() {
                @Override
                public Observable<Integer> apply(List<Integer> ints) {
                    return Observable.from(ints);
                }
            })
Run Code Online (Sandbox Code Playgroud)

rx-java rx-java2

14
推荐指数
3
解决办法
6665
查看次数

是否建议在订阅完成后立即调用Disposable.dispose()?

我有一个Activity我正在创建和订阅Single该类的多个实例(每个在一个单独的后台线程中做一些工作).对于每个订阅,我将Disposable创建的CompositeDisposable实例添加到作用域的实例Activity.当Activity被销毁时,我正在调用CompositeDisposable.clear()方法来处理所有订阅Activity.这当然意味着所有Disposable实例(包括那些已经完成工作的订阅的实例)都会在我的实例中出现,Activity直到Activity被销毁为止.

这是可以的,还是我应该Disposable.dispose()在每次特定Single实例完成工作时(即SingleObserver接收onSuccessonError回调时)调用每个单独的订阅?后一种方法的问题在于我必须跟踪哪个Disposable与哪个相关SingleObserver(这使得使用点失效CompositeDisposable).每次Single实例完成工作时,是否有更好的方法来处理订阅?

android rx-java2

14
推荐指数
1
解决办法
5338
查看次数

RxJava Scheduler在主线程上观察

如果我写这样的事情,那么这两个操作和通知将是对当前线程...

Observable.fromCallable(() -> "Do Something")
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

如果我在后台线程这样的操作,那么这两个操作和通知将在后台线程...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

如果我想在主线程上观察并在Android中执行后台我会做...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但是如果我正在编写一个标准的Java程序,那么你想在主线程上观察到的状态是什么呢?

java rx-java rx-java2

14
推荐指数
2
解决办法
5889
查看次数

如何使用Android Room和RxJava 2没有结果?

我有数据库与表联系,我想检查是否有联系某些电话号码.

@Query("SELECT * FROM contact WHERE phone_number = :number")
Flowable<Contact> findByPhoneNumber(int number);
Run Code Online (Sandbox Code Playgroud)

我有RxJava 2 Composite一次性声明,以检查是否有电话号码联系.

disposable.add(Db.with(context).getContactsDao().findByPhoneNumber(phoneNumber)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<Contact>() {
                @Override
                public void onNext(final Contact contact) {
                    Log.d("TAG", "phone number fined");
                    Conversation conversation;
                    if(contact != null){
                        conversation = Db.with(context).getConversationsDao().findBySender(contact.getContactId());
                        if(conversation != null){
                            conversation.setUpdatedAt(Utils.getDateAndTimeNow());
                            saveConversation(contact, conversation, context, text, phoneNumber, false);
                        } else {
                            conversation = getConversation(contact, contact.getPhoneNumber());
                            saveConversation(contact, conversation, context, text, phoneNumber, true);
                        }
                    } else {
                        conversation = Db.with(context).getConversationsDao().findByPhone(phoneNumber);
                        if(conversation != null){
                            conversation.setUpdatedAt(Utils.getDateAndTimeNow());
                            saveConversation(contact, conversation, context, text, phoneNumber, false); …
Run Code Online (Sandbox Code Playgroud)

android rx-java2 android-room

14
推荐指数
3
解决办法
9344
查看次数

RxJava.顺序执行

在我的Android应用程序中,我有一个处理用户交互的演示者,包含一种请求管理器,如果需要,还可以通过请求管理器向请求管理器发送用户输入.

请求管理器本身包含服务器API并使用此RxJava处理服务器请求.

我有一个代码,每次用户输入消息并向服务器显示响应时,它会向服务器发送请求:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

但是现在我需要有一种排队.用户可以在服务器响应之前发送新消息.应按顺序处理队列中的每条消息.即,在我们得到对第一条消息的响应后,将发送第二条消息,依此类推.

如果发生错误,则不应处理进一步的请求.

我还需要在RecyclerView中显示答案.

我不知道如何更改上面的代码来实现上述处理

我看到了一些问题.一方面,该队列可以由用户随时更新,另一方面,任何时候服务器发送响应消息应该从队列中删除.

也许有一个rxjava运算符或我错过的特殊方式.

我在这里看到了类似的答案,但是,"队列"是不变的. 使用RxJava和Retrofit进行N次连续api调用

我会非常感谢任何解决方案或链接

android rx-java rx-java2

14
推荐指数
2
解决办法
4484
查看次数

RxJava Single.just()vs Single.fromCallable()?

我想知道是否有人可以对这个问题有所了解,何时使用

__CODE__

代替

__CODE__


来自文档Single.formcallable

Single.fromcallable(()-> myObject)
Run Code Online (Sandbox Code Playgroud)

和Single.just的文件

Single.Just(myObject)
Run Code Online (Sandbox Code Playgroud)

java android rx-java2

14
推荐指数
2
解决办法
7735
查看次数

如何使用Java(非Kotlin)在Android中使用RxJava管理状态

我试图根据杰克沃顿提出的以下谈话开发一个Android应用程序

The State of Managing State with RxJava
21 March 2017 – Devoxx (San Jose, CA, USA)
Run Code Online (Sandbox Code Playgroud)

杰克答应了我无法找到的第2部分和/或GITHUB示例(如果确实存在)

在高层次上,我可以关注/理解上述谈话的大部分内容.

但是我有以下问题.

我可以看到如何使用UiEvent,UiModel,Action和Result来保持关注点.

令我困惑的是以下几点: -

幻灯片194上的图表显示了Observables的"流/流"

Android Device -----> Observable<UiEvent> -----> <application code> -----> Observable<Action>  -----> {Backend}
{Backend}      -----> Observable<Result>  -----> <application code> -----> Observable<UiModel> -----> Android Device
Run Code Online (Sandbox Code Playgroud)

幻灯片210包含此代码片段,显示结果流如何"扫描"到UiModel中

SubmitUiModel initialState = SubmitUiModel.idle();
Observable<Result> results = /* ... */;
Observable<SubmitUiModel> uiModels = results.scan(initialState, (state, result) -> {
if (result == CheckNameResult.IN_FLIGHT
|| result == SubmitResult.IN_FLIGHT)
return SubmitUiModel.inProgress();
if (result == CheckNameResult.SUCCESS)
return …
Run Code Online (Sandbox Code Playgroud)

android rx-java2

13
推荐指数
1
解决办法
972
查看次数

RxAndroid和Retrofit:无法为io.reactivex.Observable创建调用适配器<retrofit2.Response <okhttp3.ResponseBody >>

我正在尝试使用rxJava,rxAndroid,Retrofit2和OkHTTP3从URL端点下载文件.我的代码无法为"Observable <retrofit2.Response <okhttp3.ResponseBody >>"创建调用适配器.这些方法对我来说都是新的,所以我相信我在这里错过了一个重要的概念.非常感谢任何方向或点.

致命异常:主要过程:com.example.khe11e.rxdownloadfile,PID:14130 java.lang.IllegalArgumentException异常:无法创建在retrofit2.ServiceMethod $ Builder.methodError方法RetrofitInterface.downloadFileByUrlRx为io.reactivex.Observable呼叫适配器>(ServiceMethod的.java:720)在retrofit2.ServiceMethod $ Builder.createCallAdapter(ServiceMethod.java:234)在retrofit2.ServiceMethod $ Builder.build(ServiceMethod.java:160)在retrofit2.Retrofit.loadServiceMethod(Retrofit.java:166)在retrofit2 .Retrofit $ 1.invoke(Retrofit.java:145)在java.lang.reflect.Proxy.invoke(Proxy.java:393)在$ Proxy0.downloadFileByUrlRx(来源不明)在com.example.khe11e.rxdownloadfile.MainActivity.downloadImage (MainActivity.java:46)在com.example.khe11e.rxdownloadfile.MainActivity $ 1.onClick(MainActivity.java:39)在android.view.View.performClick(View.java:5207)在android.view.View $ PerformClick .run(View.java:21168)在android.os.Handler.dispatchMe的android.os.Handler.handleCallback(Handler.java:746)ssage(Handler.java:95)在android.app.Looper.loop(Looper.java:148)的android.app.ActivityThread.main(ActivityThread.java:5491)at java.lang.reflect.Method.invoke(Native java.lang.IllegalArgumentException异常:方法)在com.android.internal.os.ZygoteInit $ MethodAndArgsCaller.run(ZygoteInit.java:728)在com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)所致:找不到io.reactivex.Observable>的调用适配器.尝试:*retrofit2.adapter.rxjava.RxJavaCallAdapterFactory*retrofit2.ExecutorCallAdapterFactory在retrofit2.Retrofit.nextCallAdapter(Retrofit.java:237)在retrofit2.Retrofit.callAdapter(Retrofit.java:201)在retrofit2.ServiceMethod $ Builder.createCallAdapter(ServiceMethod .java:232)......还有16个

的build.gradle:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
Run Code Online (Sandbox Code Playgroud)

RetrofitInterface.java:

package com.example.khe11e.rxdownloadfile;
import io.reactivex.Observable;
import okhttp3.ResponseBody;
import retrofit2.Call;
import retrofit2.Response;
import retrofit2.http.GET;
import retrofit2.http.Streaming;
import retrofit2.http.Url;

public interface RetrofitInterface {
    // Retrofit 2 GET request for rxjava
    @Streaming
    @GET
    Observable<Response<ResponseBody>> downloadFileByUrlRx(@Url String fileUrl);
}
Run Code Online (Sandbox Code Playgroud)

MainActivity.java:

package com.example.khe11e.rxdownloadfile;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle; …
Run Code Online (Sandbox Code Playgroud)

java android rx-java retrofit2 rx-java2

12
推荐指数
2
解决办法
1万
查看次数

mapper函数返回null值

我为调试和发布设置了相同的构建类型,

buildTypes {
    debug {
        buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
        minifyEnabled true
        shrinkResources true
        proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        signingConfig signingConfigs.release_key
    }
    release {
        buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
        minifyEnabled true
        shrinkResources true
        proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        signingConfig signingConfigs.release_key
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我使用该版本构建,我会收到以下错误.此外,服务器响应完全相同.

W/System.err: java.lang.NullPointerException: The mapper function returned a null value.
W/System.err:     at b.a.e.b.b.a(Unknown Source)
W/System.err:     at b.a.e.e.b.bs$a.onNext(Unknown Source)
W/System.err:     at b.a.e.e.b.cm$a.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(Unknown Source)
W/System.err:     at b.a.l.subscribe(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(Unknown Source)
W/System.err:     at …
Run Code Online (Sandbox Code Playgroud)

java android rx-java retrofit2 rx-java2

12
推荐指数
1
解决办法
9660
查看次数